Skip to content

Commit 9dc78e0

Browse files
authored
feature(bigquery/storage/managedwriter): add flow controller (#4404)
* feature(bigquery/storage/managedwriter): add flow controller This PR basically cribs the flow controller code from pubsub, modulo some minor changes. Managed writer needs to do the same management of traffic on a streaming rpc channel, though in this case we'll be dealing with row appends / responses, whereas in pubsub its messages and acknowledgements. * address reviewer comments
1 parent 4ff6243 commit 9dc78e0

File tree

4 files changed

+380
-0
lines changed

4 files changed

+380
-0
lines changed

Diff for: bigquery/go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/golang/protobuf v1.5.2
99
github.com/google/go-cmp v0.5.6
1010
github.com/googleapis/gax-go/v2 v2.0.5
11+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
1112
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
1213
google.golang.org/api v0.50.0
1314
google.golang.org/genproto v0.0.0-20210707141755-0f065b0b1eb9

Diff for: bigquery/go.sum

+1
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
275275
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
276276
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
277277
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
278+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
278279
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
279280
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
280281
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

Diff for: bigquery/storage/managedwriter/flow_controller.go

+119
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// Copyright 2021 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package managedwriter
16+
17+
import (
18+
"context"
19+
"sync/atomic"
20+
21+
"golang.org/x/sync/semaphore"
22+
)
23+
24+
// Flow controller for write API. Adapted from pubsub.
25+
type flowController struct {
26+
// The max number of pending write requests.
27+
maxInsertCount int
28+
// The max pending request bytes.
29+
maxInsertBytes int
30+
31+
// Semaphores for governing pending inserts.
32+
semInsertCount, semInsertBytes *semaphore.Weighted
33+
34+
countRemaining int64 // Atomic.
35+
}
36+
37+
func newFlowController(maxInserts, maxInsertBytes int) *flowController {
38+
fc := &flowController{
39+
maxInsertCount: maxInserts,
40+
maxInsertBytes: maxInsertBytes,
41+
semInsertCount: nil,
42+
semInsertBytes: nil,
43+
}
44+
if maxInserts > 0 {
45+
fc.semInsertCount = semaphore.NewWeighted(int64(maxInserts))
46+
}
47+
if maxInsertBytes > 0 {
48+
fc.semInsertBytes = semaphore.NewWeighted(int64(maxInsertBytes))
49+
}
50+
return fc
51+
}
52+
53+
// acquire blocks until one insert of size bytes can proceed or ctx is done.
54+
// It returns nil in the first case, or ctx.Err() in the second.
55+
//
56+
// acquire allows large messages to proceed by treating a size greater than maxSize
57+
// as if it were equal to maxSize.
58+
func (fc *flowController) acquire(ctx context.Context, sizeBytes int) error {
59+
if fc.semInsertCount != nil {
60+
if err := fc.semInsertCount.Acquire(ctx, 1); err != nil {
61+
return err
62+
}
63+
}
64+
if fc.semInsertBytes != nil {
65+
if err := fc.semInsertBytes.Acquire(ctx, fc.bound(sizeBytes)); err != nil {
66+
if fc.semInsertCount != nil {
67+
fc.semInsertCount.Release(1)
68+
}
69+
return err
70+
}
71+
}
72+
atomic.AddInt64(&fc.countRemaining, 1)
73+
return nil
74+
}
75+
76+
// tryAcquire returns false if acquire would block. Otherwise, it behaves like
77+
// acquire and returns true.
78+
//
79+
// tryAcquire allows large inserts to proceed by treating a size greater than
80+
// maxSize as if it were equal to maxSize.
81+
func (fc *flowController) tryAcquire(sizeBytes int) bool {
82+
if fc.semInsertCount != nil {
83+
if !fc.semInsertCount.TryAcquire(1) {
84+
return false
85+
}
86+
}
87+
if fc.semInsertBytes != nil {
88+
if !fc.semInsertBytes.TryAcquire(fc.bound(sizeBytes)) {
89+
if fc.semInsertCount != nil {
90+
fc.semInsertCount.Release(1)
91+
}
92+
return false
93+
}
94+
}
95+
atomic.AddInt64(&fc.countRemaining, 1)
96+
return true
97+
}
98+
99+
func (fc *flowController) release(sizeBytes int) {
100+
atomic.AddInt64(&fc.countRemaining, -1)
101+
if fc.semInsertCount != nil {
102+
fc.semInsertCount.Release(1)
103+
}
104+
if fc.semInsertBytes != nil {
105+
fc.semInsertBytes.Release(fc.bound(sizeBytes))
106+
}
107+
}
108+
109+
// bound normalizes input size to maxInsertBytes if it exceeds the limit.
110+
func (fc *flowController) bound(sizeBytes int) int64 {
111+
if sizeBytes > fc.maxInsertBytes {
112+
return int64(fc.maxInsertBytes)
113+
}
114+
return int64(sizeBytes)
115+
}
116+
117+
func (fc *flowController) count() int {
118+
return int(atomic.LoadInt64(&fc.countRemaining))
119+
}
+259
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
// Copyright 2021 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package managedwriter
16+
17+
import (
18+
"context"
19+
"errors"
20+
"fmt"
21+
"sync/atomic"
22+
"testing"
23+
"time"
24+
25+
"golang.org/x/sync/errgroup"
26+
)
27+
28+
func TestFlowControllerCancel(t *testing.T) {
29+
// Test canceling a flow controller's context.
30+
t.Parallel()
31+
wantInsertBytes := 10
32+
fc := newFlowController(3, wantInsertBytes)
33+
if fc.maxInsertBytes != 10 {
34+
t.Fatalf("maxInsertBytes mismatch, got %d want %d", fc.maxInsertBytes, wantInsertBytes)
35+
}
36+
if err := fc.acquire(context.Background(), 5); err != nil {
37+
t.Fatal(err)
38+
}
39+
// Experiment: a context that times out should always return an error.
40+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
41+
defer cancel()
42+
if err := fc.acquire(ctx, 6); err != context.DeadlineExceeded {
43+
t.Fatalf("got %v, expected DeadlineExceeded", err)
44+
}
45+
// Control: a context that is not done should always return nil.
46+
go func() {
47+
time.Sleep(5 * time.Millisecond)
48+
fc.release(5)
49+
}()
50+
if err := fc.acquire(context.Background(), 6); err != nil {
51+
t.Errorf("got %v, expected nil", err)
52+
}
53+
}
54+
55+
func TestFlowControllerLargeRequest(t *testing.T) {
56+
// Large requests succeed, consuming the entire allotment.
57+
t.Parallel()
58+
fc := newFlowController(3, 10)
59+
err := fc.acquire(context.Background(), 11)
60+
if err != nil {
61+
t.Fatal(err)
62+
}
63+
}
64+
65+
func TestFlowControllerNoStarve(t *testing.T) {
66+
// A large request won't starve, because the flowController is
67+
// (best-effort) FIFO.
68+
t.Parallel()
69+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
70+
defer cancel()
71+
fc := newFlowController(10, 10)
72+
first := make(chan int)
73+
for i := 0; i < 20; i++ {
74+
go func() {
75+
for {
76+
if err := fc.acquire(ctx, 1); err != nil {
77+
if err != context.Canceled {
78+
t.Error(err)
79+
}
80+
return
81+
}
82+
select {
83+
case first <- 1:
84+
default:
85+
}
86+
fc.release(1)
87+
}
88+
}()
89+
}
90+
<-first // Wait until the flowController's state is non-zero.
91+
if err := fc.acquire(ctx, 11); err != nil {
92+
t.Errorf("got %v, want nil", err)
93+
}
94+
}
95+
96+
func TestFlowControllerSaturation(t *testing.T) {
97+
t.Parallel()
98+
const (
99+
maxCount = 6
100+
maxSize = 10
101+
)
102+
for _, test := range []struct {
103+
acquireSize int
104+
wantCount, wantSize int64
105+
}{
106+
{
107+
// Many small acquires cause the flow controller to reach its max count.
108+
acquireSize: 1,
109+
wantCount: 6,
110+
wantSize: 6,
111+
},
112+
{
113+
// Five acquires of size 2 will cause the flow controller to reach its max size,
114+
// but not its max count.
115+
acquireSize: 2,
116+
wantCount: 5,
117+
wantSize: 10,
118+
},
119+
{
120+
// If the requests are the right size (relatively prime to maxSize),
121+
// the flow controller will not saturate on size. (In this case, not on count either.)
122+
acquireSize: 3,
123+
wantCount: 3,
124+
wantSize: 9,
125+
},
126+
} {
127+
fc := newFlowController(maxCount, maxSize)
128+
// Atomically track flow controller state.
129+
// The flowController itself tracks count.
130+
var curSize int64
131+
success := errors.New("")
132+
// Time out if wantSize or wantCount is never reached.
133+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
134+
defer cancel()
135+
g, ctx := errgroup.WithContext(ctx)
136+
for i := 0; i < 10; i++ {
137+
g.Go(func() error {
138+
var hitCount, hitSize bool
139+
// Run at least until we hit the expected values, and at least
140+
// for enough iterations to exceed them if the flow controller
141+
// is broken.
142+
for i := 0; i < 100 || !hitCount || !hitSize; i++ {
143+
select {
144+
case <-ctx.Done():
145+
return ctx.Err()
146+
default:
147+
}
148+
if err := fc.acquire(ctx, test.acquireSize); err != nil {
149+
return err
150+
}
151+
c := int64(fc.count())
152+
if c > test.wantCount {
153+
return fmt.Errorf("count %d exceeds want %d", c, test.wantCount)
154+
}
155+
if c == test.wantCount {
156+
hitCount = true
157+
}
158+
s := atomic.AddInt64(&curSize, int64(test.acquireSize))
159+
if s > test.wantSize {
160+
return fmt.Errorf("size %d exceeds want %d", s, test.wantSize)
161+
}
162+
if s == test.wantSize {
163+
hitSize = true
164+
}
165+
time.Sleep(5 * time.Millisecond) // Let other goroutines make progress.
166+
if atomic.AddInt64(&curSize, -int64(test.acquireSize)) < 0 {
167+
return errors.New("negative size")
168+
}
169+
fc.release(test.acquireSize)
170+
}
171+
return success
172+
})
173+
}
174+
if err := g.Wait(); err != success {
175+
t.Errorf("%+v: %v", test, err)
176+
continue
177+
}
178+
}
179+
}
180+
181+
func TestFlowControllerTryAcquire(t *testing.T) {
182+
t.Parallel()
183+
fc := newFlowController(3, 10)
184+
185+
// Successfully tryAcquire 4 bytes.
186+
if !fc.tryAcquire(4) {
187+
t.Error("got false, wanted true")
188+
}
189+
190+
// Fail to tryAcquire 7 bytes.
191+
if fc.tryAcquire(7) {
192+
t.Error("got true, wanted false")
193+
}
194+
195+
// Successfully tryAcquire 6 byte.
196+
if !fc.tryAcquire(6) {
197+
t.Error("got false, wanted true")
198+
}
199+
}
200+
201+
func TestFlowControllerUnboundedCount(t *testing.T) {
202+
t.Parallel()
203+
ctx := context.Background()
204+
fc := newFlowController(0, 10)
205+
206+
// Successfully acquire 4 bytes.
207+
if err := fc.acquire(ctx, 4); err != nil {
208+
t.Errorf("got %v, wanted no error", err)
209+
}
210+
211+
// Successfully tryAcquire 4 bytes.
212+
if !fc.tryAcquire(4) {
213+
t.Error("got false, wanted true")
214+
}
215+
216+
// Fail to tryAcquire 3 bytes.
217+
if fc.tryAcquire(3) {
218+
t.Error("got true, wanted false")
219+
}
220+
}
221+
222+
func TestFlowControllerUnboundedCount2(t *testing.T) {
223+
t.Parallel()
224+
ctx := context.Background()
225+
fc := newFlowController(0, 0)
226+
// Successfully acquire 4 bytes.
227+
if err := fc.acquire(ctx, 4); err != nil {
228+
t.Errorf("got %v, wanted no error", err)
229+
}
230+
fc.release(1)
231+
fc.release(1)
232+
fc.release(1)
233+
wantCount := int64(-2)
234+
c := int64(fc.count())
235+
if c != wantCount {
236+
t.Fatalf("got count %d, want %d", c, wantCount)
237+
}
238+
}
239+
240+
func TestFlowControllerUnboundedBytes(t *testing.T) {
241+
t.Parallel()
242+
ctx := context.Background()
243+
fc := newFlowController(2, 0)
244+
245+
// Successfully acquire 4GB.
246+
if err := fc.acquire(ctx, 4e9); err != nil {
247+
t.Errorf("got %v, wanted no error", err)
248+
}
249+
250+
// Successfully tryAcquire 4GB bytes.
251+
if !fc.tryAcquire(4e9) {
252+
t.Error("got false, wanted true")
253+
}
254+
255+
// Fail to tryAcquire a third message.
256+
if fc.tryAcquire(3) {
257+
t.Error("got true, wanted false")
258+
}
259+
}

0 commit comments

Comments
 (0)