From c1cb7648eb7c16cd97ce01e9701fd183df3f17f1 Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Fri, 18 Jan 2019 14:17:32 -0500 Subject: [PATCH 1/5] pubsub: dynamically choose the number of messages for ReceiveBatch To decide how many messages to pull at a time, we aim for the in-memory queue of messages to be a certain size. That gives us a buffer of messages to draw from, ensuring high throughput, without pulling so many messages that the unconsumed ones languish. We measure the size by time instead of message count. Time is more relevant, because ack deadlines are expressed in time, and it's easier to think about lost work (in the event of a crash) in terms of time lost rather than messages lost. We keep track of the average time it takes to process a message. Then we can convert a queue size in time to a number of messages. We compute processing time by measuring the time between when Receive returns and when it is next called. Although this is incorrect in the short term, because multiple goroutines may call Receive at the same time, in the long run it is accurate enough. We rejected the obvious alternative, measuring time from Receive to Ack, because not every message will be acked. It is perfectly reasonable for a subscriber to nack (or fail to ack) a significant fraction of the messages it receives, but processing time for those unacked messages should still be included in the calculation of how many messages to pull. This change significantly improves the Receive benchmark -- messages per second is more than quadrupled. But there is more work to do. We should pre-emptively pull messages when the queue size gets low, and we should issue multiple ReceiveBatch calls concurrently. Besides performance, this change also improves behavior over current master at very low processing rates. Currently we pull a constant 10 messages per ReceiveBatch. If it takes a long time to process one message, then the other 9 will sit in RAM and may expire. With this change, we will pull just one message at a time if need be. Addresses #691. --- pubsub/averager.go | 81 ++++++++++++++++++ pubsub/averager_test.go | 57 ++++++++++++ ...riptionSucceedsOnOpenButFailsOnSend.replay | Bin 457 -> 292 bytes ...ntTopicSucceedsOnOpenButFailsOnSend.replay | Bin 420 -> 276 bytes .../TestConformance/TestSendReceive.replay | Bin 4073 -> 4621 bytes .../TestConformance/TestSendReceiveTwo.replay | Bin 6833 -> 7913 bytes pubsub/pubsub.go | 71 ++++++++++++--- 7 files changed, 198 insertions(+), 11 deletions(-) create mode 100644 pubsub/averager.go create mode 100644 pubsub/averager_test.go diff --git a/pubsub/averager.go b/pubsub/averager.go new file mode 100644 index 0000000000..04688d1272 --- /dev/null +++ b/pubsub/averager.go @@ -0,0 +1,81 @@ +// Copyright 2019 The Go Cloud Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pubsub + +import ( + "sync" + "time" +) + +// An averager keeps track of an average value over a time interval. +// It does so by tracking the average over several sub-intervals. +type averager struct { + mu sync.Mutex + buckets []bucket // always nBuckets of these, last is most recent + bucketInterval time.Duration + end time.Time // latest time we can currently record (end of the last bucket's interval) +} + +type bucket struct { + count float64 // number of points in the bucket + avg float64 // average value of the bucket +} + +// newAverager returns an averager that will average a value over dur, +// by splitting it into nBuckets sub-intervals. +func newAverager(dur time.Duration, nBuckets int) *averager { + bi := dur / time.Duration(nBuckets) + return &averager{ + buckets: make([]bucket, nBuckets), + bucketInterval: bi, + end: time.Now().Add(bi), + } +} + +// average returns the average value. +// It returns NaN if there are no recorded points. +func (a *averager) average() float64 { + a.mu.Lock() + defer a.mu.Unlock() + var n, total float64 + for _, b := range a.buckets { + n += b.count + total += b.count * b.avg + } + return total / n +} + +// add adds a point to the average at the present time. +func (a *averager) add(x float64) { a.addInternal(x, time.Now()) } + +// addInternal adds a point x to the average, at time t. +// t should be time.Now() except during testing. +func (a *averager) addInternal(x float64, t time.Time) { + a.mu.Lock() + defer a.mu.Unlock() + // Add new buckets and discard old ones until we can accommodate time t. + for t.After(a.end) { + a.buckets = append(a.buckets[1:], bucket{}) + a.end = a.end.Add(a.bucketInterval) + } + // We only support adding to the most recent bucket. + if t.Before(a.end.Add(-a.bucketInterval)) { + panic("time too early") + } + b := &a.buckets[len(a.buckets)-1] + // The new average is (old total + x) / (old count + 1). + b.avg = (b.count*b.avg + x) / (b.count + 1) + b.count++ +} diff --git a/pubsub/averager_test.go b/pubsub/averager_test.go new file mode 100644 index 0000000000..7f035d736b --- /dev/null +++ b/pubsub/averager_test.go @@ -0,0 +1,57 @@ +// Copyright 2019 The Go Cloud Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pubsub + +import ( + "testing" + "time" +) + +func TestAverager(t *testing.T) { + a := newAverager(time.Minute, 4) + start := time.Now() + for i := 0; i < 10; i++ { + a.addInternal(5, start) + } + if got, want := a.average(), 5.0; got != want { + t.Errorf("got %f, want %f", got, want) + } + + a = newAverager(time.Minute, 4) + start = time.Now() + n := 60 + var total float64 + for i := 0; i < n; i++ { + total += float64(i) + a.addInternal(float64(i), start.Add(time.Duration(i)*time.Second)) + } + // All the points are within one minute of each other, so they should all be counted. + got := a.average() + want := total / float64(n) + if got != want { + t.Errorf("got %f, want %f", got, want) + } + + // Values older than the duration are dropped. + a = newAverager(time.Minute, 4) + a.add(10) + if got, want := a.average(), 10.0; got != want { + t.Errorf("got %f, want %f", got, want) + } + a.addInternal(3, a.end.Add(2*time.Minute)) + if got, want := a.average(), 3.0; got != want { + t.Errorf("got %f, want %f", got, want) + } +} diff --git a/pubsub/gcppubsub/testdata/TestConformance/TestNonExistentSubscriptionSucceedsOnOpenButFailsOnSend.replay b/pubsub/gcppubsub/testdata/TestConformance/TestNonExistentSubscriptionSucceedsOnOpenButFailsOnSend.replay index ed2f8b572a8e7245ef0eff29798b245cf445ac82..d3e9a060ba6a053fd018ef5f3acfe30d29e63442 100644 GIT binary patch delta 29 lcmX@fyo71OW=_T&1_lNWCaIK(do>uXCqB1kQDD?y1OR#_2WJ2P delta 168 zcmZ3&bdq_(W-cy4Mg|5BCaDgliF-7d?lDe$YRxLOfRRgM;y(F$m(--vbkDrBe4!Gd zBIo?loD_w;{1Sz<%)AtZ;L@bxip=7Y)Vva1WLZrX1x5`<0DQ|iO8@`> diff --git a/pubsub/gcppubsub/testdata/TestConformance/TestNonExistentTopicSucceedsOnOpenButFailsOnSend.replay b/pubsub/gcppubsub/testdata/TestConformance/TestNonExistentTopicSucceedsOnOpenButFailsOnSend.replay index 5cdf7424b42b1a2a619196a18d2da0b459401cb2..47107aefd3e506dac9661b8a7ed34b910a782cc4 100644 GIT binary patch delta 41 xcmZ3&JcVh(YOYiU1_ll$saP)6i8j(aMFq)v!6k_$rNu%f6YrU@C@^X;0s!JR3c3IQ delta 153 zcmbQjw1j!WYVNNL3=AAhQg0Z!R43X)Y?ZI53$vO*-9O zErbF!(R_g5T?l>x(LUZsks>{33kByyo}&5rbbxjiqnS8O`H5-2P~xaenhKXPfs~(& z(71CoT8Z&jBU7|!@Pwn$<=N-VD={5HlPU}8AuB?M0-ktV}SVktrq zTqTs29bhdv$>96~QIu!m+2m4!j!3z1Z~~hpi%DUAaxpy{4EkJHsFL*0$6W-;BjZhW zVg6a}W5YvRo6kQDoO*lr%{V&zN_STu8tLkCSG#&YENfF9ZA(gm&19vb2sj>?-y zE^am@)hT+%)Xi#;Uf~z6O`yy0csn3-k|HsM6$`m0D`oV=7fpYk`;F*tyT9Z5TjGeN zgY*~n9;m-!is0kZu^AV$Ska@u#%!wVuQ6L1`rEj*J@HS=3bopbGcm=7iG-uHsLxg! z^QB%^V?OMc_24~;9zWFOFMPj?ynCTpdD|t?2SiX`67NAet*|MnZiO9Q8I4%V$RWoJ xaO^Klz(}J;jU*wv8k^hi?;10kH*xbL2)@#8E;ar+Y0a&Z`&u{0yb&LVvI)J$-5`dMiUR-7cX8EX!#LY7!sI{gan%RY2WYnc|K2{mk%>MRHRgG zqW;;df6WMb)s~gztOTt}C9PJ{)-BQ+uB4^BnuBE0TJD2uT1|mgeR)dB zt5#7i_C_~XqpKjKl;uq*YHEE6E=qDGgKJRLa69RuNw-aF+*Gwi>z5e0v>}&^sgelS z8~@i}DZ@h%=C>f;T!xbm8k?Q`w|l^l3rN#4Td(M~x_M$bpKg0*Tu`*UT(W#Y06CC% z-BIXs1SI==SkG7Esm+HRZDsdroJ2>OpPzbu4u%rCe}aTAOkiMn%DnUAN^%BFn|Jni z5zO>nN9l5JU35?~epwzNfEuV4i~t_qtJIl5Z;uuzbQT~4dI}&Z6d`Wv*$V*2-|b9Q zfl`J1;^-wjfNVoNKykEV!+~*q!u%G$JszHf)|s`m(t$!fZe?zgog$+ z2UFl90-hj|nA5`+szrw7gF;R7GBqW^3Jx}~#^NhpCnpemWI2~m<3chZM8zcI%kka< z4FlUtWPr(WTvm&+BqwIFt6tVu*pB-JzYrl38GlR)M6*FQ#tR%N;jx$+4Eu6;f=V#4 zY6OaLDKFkyO>$L_hmTiqnl5?ED+Ha)W$E~GFck7S@lcfH(8aDsVfhpEg<12fHwRy* z=XaicHyx{>^&UZfJ}=sxNzNlMyuZJoHs*$)4yGMJyB~bx5R^S6LzxKA^=2qT(IwFy zOtBtMD0)PO+K?K8I*@e)z5Eu#W;gW_2J1_lM=X@&ssuAxXn8jY+yjBv71ZMI*LYv- z|35sh_5Rj(#`Z!9wr1>y$be>d1+~l`@x)E)Gr@JR*f0BKS(DS14J%tzv>G-y*XW1{ zI(T(`HVr8Fkm`DB8fSY)kFC0f?_{Ii;^=$RFjX`^OwgH6I%@sTW;!L53HUIzs diff --git a/pubsub/gcppubsub/testdata/TestConformance/TestSendReceiveTwo.replay b/pubsub/gcppubsub/testdata/TestConformance/TestSendReceiveTwo.replay index f8304a006a257b2f8c2e0e0bc5c499437d2200a5..bba7d490b7bf29b238cc5fcdd8ef83b93e6e15d0 100644 GIT binary patch literal 7913 zcmeHM&u`mg819!+)_nzScxWmS2(4q9H{`^h9WTiRDLoF$Q_%KCAtIBTxk*{FMv9a~-y8dV z-sgGV*WV|s#ssV>rPfUQ^?mz24~X;rs;X8M?9)t1HznT<7vp!#;a?NvrEVS=}6r-djy%U|MUc?_=4}+e;`}QOz=CVBMe?bSaF2e#81x z-Jfm$6jK`&wYgbp$avlQzJ?oR0n5041828XYHuZlKC9`~cv)&PcYaW?gmfB0~Y%A3z~u)P5JGKFz9rTfZ4Y>2k`vrjP zP~OoA5b$-g?WDweBe!8<0RozM5rPr{=(O$0Nf>y#o6|M5p=06|c!K4E!4P+e3$ZMs zX+$2ITvSj86*MYT7{szn0EJod_~fF(bWmYNg-V!1K`s;u(&WM|TR~?A71U{=RwrIH z6~)pIJtRdHr7ND&m^<$xAnaCo(=+5`VC5p+d z0Y16LQ&;0$LZpREv{umbVlgRZCNv&manM9(R3S3L@Qk1EsrM##})qq^iYnr^gU0L2rMuo%*m2a$W2Q;dt zo7BE*G0*YN&V^-Bx$Uw!uDA9E*n|9V&_U%owbS-*5s zR%zU2{e0!RJtC5h#Ei%r4RuSwmkKfdc7J2e-TY4YFvxlm{(x3us5+6xMGPx9+zr4Q!dmXB)z+Q*xDe&%x zKkvW?hiEIs!K79ye6?^nn=`mPyH8RVA?P)jZc@DlvGh{MR_@+>_z56S9I4Ws^B;(J zlij14Mc~F(P90Ir9n}ux25u^KdWuG{w7FhL2WV>Lz$4hJlRbhli&Hj&G0)5h=FBlS z3iM9V2^N1gWgZU Tkb6;mL0v+O{@B+|%IE$8|5|VK literal 6833 zcmd^EL2T1j7_QTHlm`>V5~2ilfK8eQa^hzvO>;qN5<5xN(#B1qBob7K^Ke3)*o~cr zaG^q)w8OO1kT`Cdc4N|{U8YTJleln#BWEtt?h{Ce3vuiw5h=!44v-KjQewRqzwdkB z_x*pK`Hd)#6sg#r>b&lD-g6lKrC*d~Q9?eYSyG!N-!AlRG)q!dt)PZK+CW7OrDdfm zcyEI9T3bOrV|Y=isy;!k4bSe5##3NjX~@@*psAf9q#()7GN~a|BWbu8h9SSE|ETIu zcP>Tc`c=76E7k?HrGMW<^)inHRNY1CmP~d>lJIMNy`J~J3<5iYb&2y2L*MLBC^V|2 z9&_deMXSnn@4py;wiq0oh5w3y%hpVGc1^!)_a?Ro9h^Wy-0h8;()satpF4tkf9&9% zn<7ADW^V847llJ$c5d(9PZ%*hSsI}~E{(Vk_aKeT{%r*C=fgSD4`B0?_cjI?TiIbw z;L%RAlce|+*JBtz0x$~y1YkuRqt=Z~PJ+POElySBx{C1Q;20BJ42S42v&e)X9SRbM z#+ManvI1RVg$Xc0YAMJt3~_jTS)oloEEfK6_)+-nq8E zD^TG=MGWU6E32!qAi0{p#Np=w*7JtfaC~3e=Z?<(`TPG*X6OHT_~Jj$J4LkhDxz}z zx>AlS1-6xpy$MlI*CwLAF8hdn`To1RS#ga^C9L zek%76?cMq2frt3CW^}-WrSF!7?Yi8OP+3IYzdRsh%YFls_FZ_(Gh!R!AHIdgJLu>I z0F6fB0{F^G1`S!0OUuVW>2P_%k1ariIJJ}TyxtA?gy0kij znRU!eqoNc8%G$wY)^3E4q9{3T94rxyW#eFp$Zq36%uiIArCl$x?pw%7xSW`%D&_tz zjUX1>u2QdE;+qQTYklrSNGBf~|7NYvm19Uk8X7zy_Wp>0i&kA#?_7U)?tYKGHQL^X rcEZQ0$>jq&;p6llhCVDG*ce-EJh1r|v)K&~OxhQO`;Tv&K>q9{P51tS diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 167cafeef8..4c3ba8ca4e 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -36,8 +36,10 @@ package pubsub // import "gocloud.dev/pubsub" import ( "context" "fmt" + "math" "reflect" "sync" + "time" gax "github.com/googleapis/gax-go" "gocloud.dev/gcerrors" @@ -211,12 +213,33 @@ type Subscription struct { ackBatcher driver.Batcher cancel func() // for canceling all SendAcks calls - mu sync.Mutex // protects everything below - q []*Message // local queue of messages downloaded from server - err error // permanent error - waitc chan struct{} // for goroutines waiting on ReceiveBatch + mu sync.Mutex // protects everything below + q []*Message // local queue of messages downloaded from server + err error // permanent error + waitc chan struct{} // for goroutines waiting on ReceiveBatch + lastReceiveReturn time.Time // time that the last call to Receive returned + processTimeAverager *averager // keeps a running average of the seconds to process a message } +const ( + // The desired length of a subscription's queue of messages (the messages pulled + // and waiting in memory to be doled out to Receive callers). The length is + // expressed in time; the relationship to number of messages is + // + // lengthInMessages = desiredQueueLength / averageProcessTimePerMessage + // + // In other words, if it takes 100ms to process a message on average, and we want + // 2s worth of queued messages, then we need 2/.1 = 20 messages. + // + // If desiredQueueLength is too small, then there won't be a large enough buffer + // of messages to handle fluctuations in processing time, and the queue is likely + // to become empty, reducing throughput. If desiredQueueLength is too large, then + // messages will wait in memory for a long time, possibly timing out (that is, + // their ack deadline will be exceeded). Those messages could have been handled + // by another process receiving from the same subscription. + desiredQueueLength = 2 * time.Second +) + // Receive receives and returns the next message from the Subscription's queue, // blocking and polling if none are available. This method can be called // concurrently from multiple goroutines. The Ack() method of the returned @@ -228,6 +251,12 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) { s.mu.Lock() defer s.mu.Unlock() + defer func() { s.lastReceiveReturn = time.Now() }() + + if !s.lastReceiveReturn.IsZero() { + s.processTimeAverager.add(float64(time.Since(s.lastReceiveReturn).Seconds())) + } + for { // The lock is always held here, at the top of the loop. if s.err != nil { @@ -238,10 +267,12 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) { // At least one message is available. Return it. m := s.q[0] s.q = s.q[1:] + // TODO(jba): pre-fetch more messages if the queue gets too small. return m, nil } if s.waitc != nil { // A call to ReceiveBatch is in flight. Wait for it. + // TODO(jba): support multiple calls in flight simultaneously. waitc := s.waitc s.mu.Unlock() select { @@ -255,12 +286,31 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) { } // No messages are available and there are no calls to ReceiveBatch in flight. // Make a call. + // + // Ask for a number of messages that will give us the desired queue length. + // Unless we don't have information about process time (at the beginning), in + // which case just get one message. + nMessages := 1 + avgProcessTime := s.processTimeAverager.average() + if !math.IsNaN(avgProcessTime) { + // Using Ceil guarantees at least one message. + n := math.Ceil(desiredQueueLength.Seconds() / avgProcessTime) + // Cap nMessages at some non-ridiculous value. + // Slight hack: we should be using a larger cap, like MaxInt32. But + // that messes up replay: since the tests take very little time to ack, + // n is very large, and since our averaging process is time-sensitive, + // values can differ slightly from run to run. The current cap happens + // to work, but we should come up with a more robust solution. + // (Currently it doesn't matter for performance, because gcppubsub can't + // caps to 1000 anyway.) + nMessages = int(math.Min(n, 10000)) + } s.waitc = make(chan struct{}) s.mu.Unlock() // Even though the mutex is unlocked, only one goroutine can be here. // The only way here is if s.waitc was nil. This goroutine just set // s.waitc to non-nil while holding the lock. - msgs, err := s.getNextBatch(ctx) + msgs, err := s.getNextBatch(ctx, nMessages) s.mu.Lock() close(s.waitc) s.waitc = nil @@ -276,14 +326,12 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) { } // getNextBatch gets the next batch of messages from the server and returns it. -func (s *Subscription) getNextBatch(ctx context.Context) ([]*Message, error) { +func (s *Subscription) getNextBatch(ctx context.Context, nMessages int) ([]*Message, error) { var msgs []*driver.Message for len(msgs) == 0 { err := retry.Call(ctx, gax.Backoff{}, s.driver.IsRetryable, func() error { var err error - // TODO(#691): dynamically adjust maxMessages - const maxMessages = 10 - msgs, err = s.driver.ReceiveBatch(ctx, maxMessages) + msgs, err = s.driver.ReceiveBatch(ctx, nMessages) return err }) if err != nil { @@ -342,8 +390,9 @@ var NewSubscription = newSubscription func newSubscription(d driver.Subscription, newAckBatcher func(context.Context, *Subscription) driver.Batcher) *Subscription { ctx, cancel := context.WithCancel(context.Background()) s := &Subscription{ - driver: d, - cancel: cancel, + driver: d, + cancel: cancel, + processTimeAverager: newAverager(time.Minute, 4), } if newAckBatcher == nil { newAckBatcher = defaultAckBatcher From d0e8dc08efc075413d53cc7d2d3eecf6dc564681 Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Mon, 28 Jan 2019 18:24:48 -0500 Subject: [PATCH 2/5] reviewer comments --- pubsub/averager.go | 11 +++++------ pubsub/averager_test.go | 2 +- .../TestConformance/TestSendReceive.replay | Bin 4621 -> 4617 bytes .../TestConformance/TestSendReceiveTwo.replay | Bin 7913 -> 7935 bytes pubsub/pubsub.go | 6 +++--- 5 files changed, 9 insertions(+), 10 deletions(-) diff --git a/pubsub/averager.go b/pubsub/averager.go index 04688d1272..aa43cf96c0 100644 --- a/pubsub/averager.go +++ b/pubsub/averager.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Go Cloud Authors +// Copyright 2019 The Go Cloud Development Kit Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -29,8 +29,8 @@ type averager struct { } type bucket struct { + total float64 // total of points in the bucket count float64 // number of points in the bucket - avg float64 // average value of the bucket } // newAverager returns an averager that will average a value over dur, @@ -49,10 +49,10 @@ func newAverager(dur time.Duration, nBuckets int) *averager { func (a *averager) average() float64 { a.mu.Lock() defer a.mu.Unlock() - var n, total float64 + var total, n float64 for _, b := range a.buckets { + total += b.total n += b.count - total += b.count * b.avg } return total / n } @@ -75,7 +75,6 @@ func (a *averager) addInternal(x float64, t time.Time) { panic("time too early") } b := &a.buckets[len(a.buckets)-1] - // The new average is (old total + x) / (old count + 1). - b.avg = (b.count*b.avg + x) / (b.count + 1) + b.total += x b.count++ } diff --git a/pubsub/averager_test.go b/pubsub/averager_test.go index 7f035d736b..f62b0851cf 100644 --- a/pubsub/averager_test.go +++ b/pubsub/averager_test.go @@ -1,4 +1,4 @@ -// Copyright 2019 The Go Cloud Authors +// Copyright 2019 The Go Cloud Development Kit Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pubsub/gcppubsub/testdata/TestConformance/TestSendReceive.replay b/pubsub/gcppubsub/testdata/TestConformance/TestSendReceive.replay index 593cce3c72a8c63298c322ea8571e20b494019ca..2d06c0b25a0c6408737dc92a1f746b1b909f0b70 100644 GIT binary patch delta 988 zcmeBG=~USe!@_TAW^Qa|Y-DC^X=-Gq!8*B)b^Bx`R$pF_xS_e3fvKsfh2iEJRyIbk zkb!}bxuvC{f$8LawgcRvj0_AMOj3PJT!s@JR43Q7^RnDvV`fH5 zYKW7MpQ~GHvZINvOHfW_TDY4*lAC9ulcAeSSYAqEgp;dZWKM;DkYh@fZ$^P}ihD|B zVpw^gV|kE!cwR|LL`Z;DK zyYCUJKts!^v&@rIIrcNXVBfrthl`O$4)W&VdkG1JN6he0s9(d(b&-*4F=L>Um#@F0 zpKEZKZ&6@*abdWVo4=E@Yeqnpqi#@`pKGXNWVnx;p?k2KtFe1vMt*3ZX+T7#Z@gcy zV}4Ljva@5hg|oMtp{Z|nu|Z&1J}}T+{oEtGETW7Jvm#s_J)8_GlDu=ml3W7~&62_@ zBK+MvJzYJ5BF!T6!hn_qdO8~fg`^s%re}DkdgVkKghplLTSjDf`S`e)>iSkixdn!q z2x)RL8bJfp2o|95cvj-(_y7se$NRWQk6K^Ms4Wh4D)({>%PWZp&q9e>eNfaILS14A za|t{V!lU-U>!UZAHF$F;@8>aK+&B3Hk2Pb{)Y?ZI53$vO*-9O zErbF!(R_g5T?l>x(LUZsks>{33kByyo}&5rbbxjiqnS8O`H5-2P~xaenhKXPfs~(& z(71CoT8Z&jBU7|!@Pwn$<=N-VD={5HlPU}8AuB?M0-ktV}SVktrq zTqTs29bhdv$>96~QIu!m+2m4!j!3z1Z~~hpi%DUAaxpy{4EkJHsFL*0$6W-;BjZhW zVg6a}W5YvRo6kQDoO*lr%{V&zN_STu8tLkCSG#&YENfF9ZA(gm&19vb2sj>?-y zE^am@)hT+%)Xi#;Uf~z6O`yy0csn3-k|HsM6$`m0D`oV=7fpYk`;F*tyT9Z5TjGeN zgY*~n9;m-!is0kZu^AV$Ska@u#%!wVuQ6L1`rEj*J@HS=3bopbGcm=7iG-uHsLxg! z^QB%^V?OMc_24~;9zWFOFMPj?ynCTpdD|t?2SiX`67NAet*|MnZiO9Q8I4%V$RWoJ xaO^Klz(}J;jU*wv8k^hi?;10kH*xbL2)@#8E;ar+Y0a&Z`&u{0yb&Ej`7|utUCHNKU@RJRpi8%V-s5(FGE!jD}8Nu;*&&ll{*!7er> z@zt&`m|F`u_uvbcJAd6m}QDYyFY5s%Bqx?;@@M?G6Z#*q{qFX5-`YKl7CCA0wCU z`7s>F_FlmA-)!>+ue z6~Llfb<0Vy>jSr8Y#IUrdk+F014Q2O-YO!X4!l&;!3YzOxP^*R7ow!g} zRa-xVONy#$TRi-bJMSzw?X=2od@d&gI~NwU)bkL)zXALaaLG!7O{rXl$gX763d>a( zIn6~Hh0PMj((z?l*_PN%nayo)rg=J*=g7#-!cMW0NflS)3|ojRvV^$AN-jmOiSc3} z$*RqbGEE3{nop2)vb02G%4{mPrf%oixU`d%^{})oHTjapBp5SMs>W8!$$W`QXJjL{ z)>v1V*sA>b%1SaC$nLN-Q;Nn@73>lO9fkNE-S0E}{PfhbJNN&YnA!dH?i~L9ebS}s+kwKC_%Tsn=WV=nDR(MfCf!CwS^>R6ip-vEY4 zz0q#g)GYAgEgv3}%^TTX(Ga}U>dXTTj%($u zR`!#U6Kh+h~CdA+mgT18u>2vgli2VQn literal 7913 zcmeHM&u`mg819!+)_nzScxWmS2(4q9H{`^h9WTiRDLoF$Q_%KCAtIBTxk*{FMv9a~-y8dV z-sgGV*WV|s#ssV>rPfUQ^?mz24~X;rs;X8M?9)t1HznT<7vp!#;a?NvrEVS=}6r-djy%U|MUc?_=4}+e;`}QOz=CVBMe?bSaF2e#81x z-Jfm$6jK`&wYgbp$avlQzJ?oR0n5041828XYHuZlKC9`~cv)&PcYaW?gmfB0~Y%A3z~u)P5JGKFz9rTfZ4Y>2k`vrjP zP~OoA5b$-g?WDweBe!8<0RozM5rPr{=(O$0Nf>y#o6|M5p=06|c!K4E!4P+e3$ZMs zX+$2ITvSj86*MYT7{szn0EJod_~fF(bWmYNg-V!1K`s;u(&WM|TR~?A71U{=RwrIH z6~)pIJtRdHr7ND&m^<$xAnaCo(=+5`VC5p+d z0Y16LQ&;0$LZpREv{umbVlgRZCNv&manM9(R3S3L@Qk1EsrM##})qq^iYnr^gU0L2rMuo%*m2a$W2Q;dt zo7BE*G0*YN&V^-Bx$Uw!uDA9E*n|9V&_U%owbS-*5s zR%zU2{e0!RJtC5h#Ei%r4RuSwmkKfdc7J2e-TY4YFvxlm{(x3us5+6xMGPx9+zr4Q!dmXB)z+Q*xDe&%x zKkvW?hiEIs!K79ye6?^nn=`mPyH8RVA?P)jZc@DlvGh{MR_@+>_z56S9I4Ws^B;(J zlij14Mc~F(P90Ir9n}ux25u^KdWuG{w7FhL2WV>Lz$4hJlRbhli&Hj&G0)5h=FBlS z3iM9V2^N1gWgZU Tkb6;mL0v+O{@B+|%IE$8|5|VK diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 4c3ba8ca4e..25a56b7971 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -301,9 +301,9 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) { // n is very large, and since our averaging process is time-sensitive, // values can differ slightly from run to run. The current cap happens // to work, but we should come up with a more robust solution. - // (Currently it doesn't matter for performance, because gcppubsub can't - // caps to 1000 anyway.) - nMessages = int(math.Min(n, 10000)) + // (Currently it doesn't matter for performance, because gcppubsub + // caps maxMessages to 1000 anyway.) + nMessages = int(math.Min(n, 1000)) } s.waitc = make(chan struct{}) s.mu.Unlock() From 97a23fa1e5a6ddb31f8b58e0abd52fe0e55d14ee Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Tue, 29 Jan 2019 07:02:04 -0500 Subject: [PATCH 3/5] switched to simple moving average --- pubsub/averager.go | 80 ----------------------------------------- pubsub/averager_test.go | 57 ----------------------------- pubsub/pubsub.go | 30 +++++++++------- 3 files changed, 17 insertions(+), 150 deletions(-) delete mode 100644 pubsub/averager.go delete mode 100644 pubsub/averager_test.go diff --git a/pubsub/averager.go b/pubsub/averager.go deleted file mode 100644 index aa43cf96c0..0000000000 --- a/pubsub/averager.go +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright 2019 The Go Cloud Development Kit Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pubsub - -import ( - "sync" - "time" -) - -// An averager keeps track of an average value over a time interval. -// It does so by tracking the average over several sub-intervals. -type averager struct { - mu sync.Mutex - buckets []bucket // always nBuckets of these, last is most recent - bucketInterval time.Duration - end time.Time // latest time we can currently record (end of the last bucket's interval) -} - -type bucket struct { - total float64 // total of points in the bucket - count float64 // number of points in the bucket -} - -// newAverager returns an averager that will average a value over dur, -// by splitting it into nBuckets sub-intervals. -func newAverager(dur time.Duration, nBuckets int) *averager { - bi := dur / time.Duration(nBuckets) - return &averager{ - buckets: make([]bucket, nBuckets), - bucketInterval: bi, - end: time.Now().Add(bi), - } -} - -// average returns the average value. -// It returns NaN if there are no recorded points. -func (a *averager) average() float64 { - a.mu.Lock() - defer a.mu.Unlock() - var total, n float64 - for _, b := range a.buckets { - total += b.total - n += b.count - } - return total / n -} - -// add adds a point to the average at the present time. -func (a *averager) add(x float64) { a.addInternal(x, time.Now()) } - -// addInternal adds a point x to the average, at time t. -// t should be time.Now() except during testing. -func (a *averager) addInternal(x float64, t time.Time) { - a.mu.Lock() - defer a.mu.Unlock() - // Add new buckets and discard old ones until we can accommodate time t. - for t.After(a.end) { - a.buckets = append(a.buckets[1:], bucket{}) - a.end = a.end.Add(a.bucketInterval) - } - // We only support adding to the most recent bucket. - if t.Before(a.end.Add(-a.bucketInterval)) { - panic("time too early") - } - b := &a.buckets[len(a.buckets)-1] - b.total += x - b.count++ -} diff --git a/pubsub/averager_test.go b/pubsub/averager_test.go deleted file mode 100644 index f62b0851cf..0000000000 --- a/pubsub/averager_test.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2019 The Go Cloud Development Kit Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pubsub - -import ( - "testing" - "time" -) - -func TestAverager(t *testing.T) { - a := newAverager(time.Minute, 4) - start := time.Now() - for i := 0; i < 10; i++ { - a.addInternal(5, start) - } - if got, want := a.average(), 5.0; got != want { - t.Errorf("got %f, want %f", got, want) - } - - a = newAverager(time.Minute, 4) - start = time.Now() - n := 60 - var total float64 - for i := 0; i < n; i++ { - total += float64(i) - a.addInternal(float64(i), start.Add(time.Duration(i)*time.Second)) - } - // All the points are within one minute of each other, so they should all be counted. - got := a.average() - want := total / float64(n) - if got != want { - t.Errorf("got %f, want %f", got, want) - } - - // Values older than the duration are dropped. - a = newAverager(time.Minute, 4) - a.add(10) - if got, want := a.average(), 10.0; got != want { - t.Errorf("got %f, want %f", got, want) - } - a.addInternal(3, a.end.Add(2*time.Minute)) - if got, want := a.average(), 3.0; got != want { - t.Errorf("got %f, want %f", got, want) - } -} diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 25a56b7971..ea493853b3 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -213,12 +213,12 @@ type Subscription struct { ackBatcher driver.Batcher cancel func() // for canceling all SendAcks calls - mu sync.Mutex // protects everything below - q []*Message // local queue of messages downloaded from server - err error // permanent error - waitc chan struct{} // for goroutines waiting on ReceiveBatch - lastReceiveReturn time.Time // time that the last call to Receive returned - processTimeAverager *averager // keeps a running average of the seconds to process a message + mu sync.Mutex // protects everything below + q []*Message // local queue of messages downloaded from server + err error // permanent error + waitc chan struct{} // for goroutines waiting on ReceiveBatch + lastReceiveReturn time.Time // time that the last call to Receive returned + avgProcessTime float64 // moving average of the seconds to process a message } const ( @@ -238,6 +238,11 @@ const ( // their ack deadline will be exceeded). Those messages could have been handled // by another process receiving from the same subscription. desiredQueueLength = 2 * time.Second + + // The factor by which old points decay when a new point is added to the moving + // average. The larger this number, the more weight will be given to the newest + // point in preference to older ones. + decay = 0.05 ) // Receive receives and returns the next message from the Subscription's queue, @@ -254,7 +259,8 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) { defer func() { s.lastReceiveReturn = time.Now() }() if !s.lastReceiveReturn.IsZero() { - s.processTimeAverager.add(float64(time.Since(s.lastReceiveReturn).Seconds())) + t := float64(time.Since(s.lastReceiveReturn).Seconds()) + s.avgProcessTime = s.avgProcessTime*(1-decay) + t*decay } for { @@ -291,10 +297,9 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) { // Unless we don't have information about process time (at the beginning), in // which case just get one message. nMessages := 1 - avgProcessTime := s.processTimeAverager.average() - if !math.IsNaN(avgProcessTime) { + if s.avgProcessTime > 0 { // Using Ceil guarantees at least one message. - n := math.Ceil(desiredQueueLength.Seconds() / avgProcessTime) + n := math.Ceil(desiredQueueLength.Seconds() / s.avgProcessTime) // Cap nMessages at some non-ridiculous value. // Slight hack: we should be using a larger cap, like MaxInt32. But // that messes up replay: since the tests take very little time to ack, @@ -390,9 +395,8 @@ var NewSubscription = newSubscription func newSubscription(d driver.Subscription, newAckBatcher func(context.Context, *Subscription) driver.Batcher) *Subscription { ctx, cancel := context.WithCancel(context.Background()) s := &Subscription{ - driver: d, - cancel: cancel, - processTimeAverager: newAverager(time.Minute, 4), + driver: d, + cancel: cancel, } if newAckBatcher == nil { newAckBatcher = defaultAckBatcher From c839c149778dc6231e3cc403309c67aa92c462d0 Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Wed, 30 Jan 2019 16:58:46 -0500 Subject: [PATCH 4/5] measure process time by ack --- ...riptionSucceedsOnOpenButFailsOnSend.replay | Bin 292 -> 292 bytes .../TestConformance/TestSendReceive.replay | Bin 4623 -> 4624 bytes .../TestConformance/TestSendReceiveTwo.replay | Bin 6879 -> 7941 bytes pubsub/pubsub.go | 66 ++++++++++-------- 4 files changed, 36 insertions(+), 30 deletions(-) diff --git a/pubsub/gcppubsub/testdata/TestConformance/TestNonExistentSubscriptionSucceedsOnOpenButFailsOnSend.replay b/pubsub/gcppubsub/testdata/TestConformance/TestNonExistentSubscriptionSucceedsOnOpenButFailsOnSend.replay index aab21650975e91df6e952672e64c730a4cf7e249..d3e9a060ba6a053fd018ef5f3acfe30d29e63442 100644 GIT binary patch delta 11 ScmZ3&w1jEGW=6(|TM7UewFE~1 delta 11 ScmZ3&w1jEGW=5`wTM7Ue*#uAk diff --git a/pubsub/gcppubsub/testdata/TestConformance/TestSendReceive.replay b/pubsub/gcppubsub/testdata/TestConformance/TestSendReceive.replay index 2951187c05485ebcaab989707eb9b2139b655278..7bf9d3dcd7fce13d199feb32b934b325d7f7d61f 100644 GIT binary patch literal 4624 zcmeHLO=#0#7&e`AHS=%5KYDWt#z}v^{_i5BYcsZWTbHFjtKw+VuW8n-iA}n8#lsLh zc=4d%(TgDH$%_cif!;iMQDMj7L3i;a9@L9%(?4uiV`-7X0);euZ}UCR`#jIfS1K?; zK}m?MA1Gc=iXT0KT{Lr|$O+ISWf?iknAU(PkYxm3PQr|NA_HRzq(zBm4L8v-WnF?M zb$LwUWfLo|HdmKdV{@oq%7`nFRpjClBP)p6xKV+!VzdJ1IB=Pj(vQk!y7*{9Oee(5 zYAnsd<>n2uABh5Q;cmX{g9tcPblK8v)kU{`^$?^Q|MGTYUs{^Z<@}4(d0)Ux*H& zgZhoFM+iRfO10AYW?j^+RJk+ULr|M0EmR69cq3c%1a`H%hQfwX#DX0{kr;+p=GE>^ zq4s~@~Np9 z>6@p4eI-x`ax=c*B9WU85PUKYX^(#{MJCu>(CDRv^+cGoFy!_*I}G3gvf=EgYa?K zSVLSG{y_CWGe%5F%Kur1|w1=?A2nx+Hk5Gur^%k27LExSJ!o1OKfhb6)q;2Z-I-H z!vsMRAzPk_6tv)~O_&O<`h=+C`tsn14nMl@%sryNhst}QhI!j_q8AFVdQQB9Xbi)K zqG}j+`PA4+sEq7uY!8j~Fa zE^4{uv?{=b((hTQW>_ddc?CLmmpO^MrOn#@?;bGZG_p&XyB_qcNuH3C`6_4S8@eHD zs$&}gL`&S&MxoCTkZR7MnXl4QcOUY1Dr;9e3ACg6`DNwjM_WL*dI;!99|i&k1~+yt zqy|9$;KtS_g4y3#QaWE>7kg4Fe{(!UfTyKeC$H*nw{rZhQ7+ks~I*cvD2&B z$utw1;RtFex;`gPh2|1;Y(7fMQU-E?@N||*W@5B6$SP}#aR%p^Nj@O5fj}ZV6<-Za zCzaJ1md>n4B;5l^2~R4K1YOjFbE?iKxS5EQzP7NQjulgK%%701hr>ayGqTPyv3V~Y z64B8byKwv|_mRVcKewKL=^x&B_ie}4VO%(q6pFD1xKhHLRELY?8H%He3=>zSWPGU= zTvg3mxN0=JhwIb!V>>p~+vTcTrx&cL*k~cEYDEQRMCf>H17j^|t|q7=y?eH9Y7gxd z-4Ir1y$mijBQmEM8edp+g_d<=4LiD@sD=jDRU>F8QNyRK2JpbvP6POebl51(1|W!y z8bB%)Z)lWaA`mk|^Qt$K2)1IB)ssNQD65@(&nP!PzqDgV_A9ty1$Q+=udK6F(|JA! z6SHUM-Rx#V JY=?|9zX2DEAgKTV diff --git a/pubsub/gcppubsub/testdata/TestConformance/TestSendReceiveTwo.replay b/pubsub/gcppubsub/testdata/TestConformance/TestSendReceiveTwo.replay index 9eb845832efc96fc14abdf0d37080ce4c53601fa..f480a0e5cceefa6d676f4fcf599705bf380af31d 100644 GIT binary patch literal 7941 zcmeHM&u`mg819!++I<6U7=}s&Li>T{ZA#+Lc9Qji&9<;)W6>}* zCH^kFY;T)5=`5E_%}Of9#&GoBYN8B_X3MyZ72E18NhrF}t|n}3*$EnDW>7X|cmLE% ziJhNjjb_bgZAeW8-|Bw9j+<2hD_FaO#Z4ozw=#j=>)vndKfetUybAV!OAV)fy#t}r z(&RpJ?hmG|8O``Z0Ff^Q2cw940KVmoWT$G~tL|OI7odXy2}FclsIe|SvG`ws(*1tq z(mgju!rU`ccmKF>a}rKW-M#-gAjgjuN9g|-N5W6{AdXCY>onl=!+m7G0lS~#9|6et z>K$%?fZu6%Iw|pX=xvxd2?0es3xPxcs_69OdC2~=o3l)#X<_08c#6p&hDtNpG{s&; zY=)d1T~v^Z3JMFAs|?D}*(^(wCq@?)+C_y936&0C(iw_nvKjK^eMdoOhZW>nXxE8L zZC&r`hw@2LUGIrUir9Il!5POYKaBaD47#}p*j3L@0PX_#1rYNpClqvXky>6_)Kx)L z%k_DY-7GaKqQG<4cx|gHH0r|2R%2e`3q=vJw@W*#>SAHF%&9_&)9O_$=9X3(JXTaL zkr(vs+8R&E{K70(;d5NGF0I!KD>wA5qQF&mmg^=}yS9L2G+*X8VXiW#%WGmW-z;ec zv$(o*Q!CG`*B6$S^6A9#PK7U5(p*6$E_o$!|@<8yf+@Q3@IMh%IrZhtghz{l3`;t$1UX6m>GH9 zmkj&%-IHP8e)nbg$?vjdOWqdkz( z(?@I|qr)7?BBfan8O9ISK<3@1p7vbiLxz5ziw@O5<|8lP1KBG_$|FM+XQbo{BF@*2 zYZc8r^Ate^snzQboP8AUv#LDSv5f~$o#LMY_@>tbbpNbyIJUPZaVy#_2|5TKXOAu( kCRe0;v<$!7(so!9owV>lJ9KCl%EHV= z_OLgv{sCsA@nAN_i;2mun!W6CH74FYc<}DQn?M1{Vi-hbU62F<&G)u_pZ9&9=cOOd z#&~Gz^66Ip^+o^fLeO{oHN&Xs&}X)lR$KAi!hLL8(Y2NeoBmi6$`%w1Q!Bea1lO%o z6Z-7svZ=LvW#eQvdbk?pK;CQ`$FOX*`b)fJ-Dp?57HnBw0uS%w5x+G!Y0EG455{1i8@x z)6$HF`&R_u3n4fig?@>E_va(ouiD_M!;35g=yZS=M^{~_tpPsU?mw1F_u<;5ySL>5 z(dS&f-`_6n0H>>W|1RQjY*vpD|F0fdeY$_wBhGJa0e)*%NB$Dv(Kq+U2nZ~s9ispq z-fZ`k6#Zc3GK}s51Vf((h>Rjw-qz$K2>!F1Yneu)1<{wmr9i0f24t8d!(lRnk)ECP zMFk(Bg0Bjd2tfw+u|Oo^d0~A~Ax5YWt3f3YCc+rr{|sWD-TO9!-kfF7$U+BB%(iuX zz#nYE7S;7JdE^Ur-Yek2_RuQ7b}l#>407S&h&{KR;70`9MYw1xo2R05LCQ9{PV44j zl*vb9w8|b+US7)4B6Z_1!{CVmLkAL^YKUBjEo#}LT!AupKAEC0GL~lWP`2J8I7yVF zIGsxrQ)DTCYein9sct!zktBuY@US9vi+LuQJP25%T+C^rDjvk@QK8a+m=zYHO*SZ& zRAaxWrZSm$$eTTr7)}bJ*QZ3o{Uqu4UUc;y-226O`MXaaJ#;+Pq~zJ8QiiBfNvd8F zVe#CgCY&E9HR0?esov*b{onvUA>cDah}S5d)`c{d&7}1jFVr|SDFi!gT@rYPN-$ct z!q-(k*R3aIhRzFk@R&UlYiU|6#JC(wX=()u(M+z+RI422jr00x^^n07CaFYgJSA$K zbeyMiH}r0vrz&Sz)eJyFB^0J6QCuz|6}!se8LM^KBCg6}r=)RDR4PQ#HS`)lCaksd znquAFuIu)&SiMJ&cRVkd_Au<3OF^n!Zx|gNR%+1w{Wgd!#KftfF(L14Uoa!lwP4eF z%N}tB;P%+?F1R+Y6Pku)DDA3`J~6FR&!wvatwhkpJMA~e@|||iu`!+YuOx0=G;5iT zLO02M`4jS(+BEk)FR8V3--B?ffvv`UFK?9l63>qN&TG77q~?{4GOwptsPoKgUZK!u z#=Op<{4yQ&>d*Fn-}z6W<4}hdoPM6{{saN<&6~O=y8yGX<1zf`VrL^_89c6TE*|(Y jc)a$F-5+zeckB||+dCg23r@>ZLF0y4{btWHDZlY2SA72; diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 2f60b6372c..685898b122 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -58,9 +58,12 @@ type Message struct { // Metadata has key/value metadata for the message. Metadata map[string]string + // processingStartTime is the time that this message was returned + // from Receive, or the zero time if it wasn't. + processingStartTime time.Time + // ack is a closure that queues this message for acknowledgement. ack func() - // mu guards isAcked in case Ack() is called concurrently. mu sync.Mutex @@ -234,31 +237,31 @@ type Subscription struct { ackBatcher driver.Batcher cancel func() // for canceling all SendAcks calls - mu sync.Mutex // protects everything below - q []*Message // local queue of messages downloaded from server - err error // permanent error - waitc chan struct{} // for goroutines waiting on ReceiveBatch - lastReceiveReturn time.Time // time that the last call to Receive returned - avgProcessTime float64 // moving average of the seconds to process a message + mu sync.Mutex // protects everything below + q []*Message // local queue of messages downloaded from server + err error // permanent error + waitc chan struct{} // for goroutines waiting on ReceiveBatch + avgProcessTime float64 // moving average of the seconds to process a message } const ( - // The desired length of a subscription's queue of messages (the messages pulled - // and waiting in memory to be doled out to Receive callers). The length is - // expressed in time; the relationship to number of messages is + // The desired duration of a subscription's queue of messages (the messages pulled + // and waiting in memory to be doled out to Receive callers). This is how long + // it would take to drain the queue at the current processing rate. + // The relationship to queue length (number of messages) is // - // lengthInMessages = desiredQueueLength / averageProcessTimePerMessage + // lengthInMessages = desiredQueueDuration / averageProcessTimePerMessage // // In other words, if it takes 100ms to process a message on average, and we want - // 2s worth of queued messages, then we need 2/.1 = 20 messages. + // 2s worth of queued messages, then we need 2/.1 = 20 messages in the queue. // - // If desiredQueueLength is too small, then there won't be a large enough buffer + // If desiredQueueDuration is too small, then there won't be a large enough buffer // of messages to handle fluctuations in processing time, and the queue is likely - // to become empty, reducing throughput. If desiredQueueLength is too large, then + // to become empty, reducing throughput. If desiredQueueDuration is too large, then // messages will wait in memory for a long time, possibly timing out (that is, // their ack deadline will be exceeded). Those messages could have been handled // by another process receiving from the same subscription. - desiredQueueLength = 2 * time.Second + desiredQueueDuration = 2 * time.Second // The factor by which old points decay when a new point is added to the moving // average. The larger this number, the more weight will be given to the newest @@ -277,13 +280,6 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) { s.mu.Lock() defer s.mu.Unlock() - defer func() { s.lastReceiveReturn = time.Now() }() - - if !s.lastReceiveReturn.IsZero() { - t := float64(time.Since(s.lastReceiveReturn).Seconds()) - s.avgProcessTime = s.avgProcessTime*(1-decay) + t*decay - } - for { // The lock is always held here, at the top of the loop. if s.err != nil { @@ -294,6 +290,7 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) { // At least one message is available. Return it. m := s.q[0] s.q = s.q[1:] + m.processingStartTime = time.Now() // TODO(jba): pre-fetch more messages if the queue gets too small. return m, nil } @@ -320,7 +317,7 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) { nMessages := 1 if s.avgProcessTime > 0 { // Using Ceil guarantees at least one message. - n := math.Ceil(desiredQueueLength.Seconds() / s.avgProcessTime) + n := math.Ceil(desiredQueueDuration.Seconds() / s.avgProcessTime) // Cap nMessages at some non-ridiculous value. // Slight hack: we should be using a larger cap, like MaxInt32. But // that messes up replay: since the tests take very little time to ack, @@ -367,15 +364,24 @@ func (s *Subscription) getNextBatch(ctx context.Context, nMessages int) ([]*Mess var q []*Message for _, m := range msgs { id := m.AckID - q = append(q, &Message{ + m2 := &Message{ Body: m.Body, Metadata: m.Metadata, - ack: func() { - // Ignore the error channel. Errors are dealt with - // in the ackBatcher handler. - _ = s.ackBatcher.AddNoWait(id) - }, - }) + } + m2.ack = func() { + t := float64(time.Since(m2.processingStartTime).Seconds()) + // Note: m2's mutex is locked here as well. Deadlock will + // result if Message.Ack is ever called with s.mu held. That currently + // cannot happen, but we should be careful if/when implementing features + // like auto-ack. + s.mu.Lock() + s.avgProcessTime = s.avgProcessTime*(1-decay) + t*decay + s.mu.Unlock() + // Ignore the error channel. Errors are dealt with + // in the ackBatcher handler. + _ = s.ackBatcher.AddNoWait(id) + } + q = append(q, m2) } return q, nil } From 891db08718f0bc666fdd85426a3dd6eaf2dc0deb Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Wed, 30 Jan 2019 17:51:27 -0500 Subject: [PATCH 5/5] init avg time to first point --- pubsub/pubsub.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 685898b122..9ddbfd6309 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -269,6 +269,17 @@ const ( decay = 0.05 ) +// Add message processing time d to the weighted moving average. +func (s *Subscription) addProcessingTime(d time.Duration) { + s.mu.Lock() + defer s.mu.Unlock() + if s.avgProcessTime == 0 { + s.avgProcessTime = d.Seconds() + } else { + s.avgProcessTime = s.avgProcessTime*(1-decay) + d.Seconds()*decay + } +} + // Receive receives and returns the next message from the Subscription's queue, // blocking and polling if none are available. This method can be called // concurrently from multiple goroutines. The Ack() method of the returned @@ -369,14 +380,12 @@ func (s *Subscription) getNextBatch(ctx context.Context, nMessages int) ([]*Mess Metadata: m.Metadata, } m2.ack = func() { - t := float64(time.Since(m2.processingStartTime).Seconds()) - // Note: m2's mutex is locked here as well. Deadlock will - // result if Message.Ack is ever called with s.mu held. That currently - // cannot happen, but we should be careful if/when implementing features - // like auto-ack. - s.mu.Lock() - s.avgProcessTime = s.avgProcessTime*(1-decay) + t*decay - s.mu.Unlock() + // Note: This call locks s.mu, and m2.mu is locked here as well. Deadlock + // will result if Message.Ack is ever called with s.mu held. That + // currently cannot happen, but we should be careful if/when implementing + // features like auto-ack. + s.addProcessingTime(time.Since(m2.processingStartTime)) + // Ignore the error channel. Errors are dealt with // in the ackBatcher handler. _ = s.ackBatcher.AddNoWait(id)