@@ -71,6 +71,7 @@ type ManagedStream struct {
71
71
schemaDescriptor * descriptorpb.DescriptorProto
72
72
destinationTable string
73
73
c * Client
74
+ fc * flowController
74
75
75
76
// aspects of the stream client
76
77
ctx context.Context // retained context for the stream
@@ -204,7 +205,7 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie
204
205
// The channel relationship with its ARC is 1:1. If we get a new ARC, create a new chan
205
206
// and fire up the associated receive processor.
206
207
ch := make (chan * pendingWrite )
207
- go recvProcessor (ms .ctx , arc , ch )
208
+ go recvProcessor (ms .ctx , arc , ms . fc , ch )
208
209
// Also, replace the sync.Once for setting up a new stream, as we need to do "special" work
209
210
// for every new connection.
210
211
ms .streamSetup = new (sync.Once )
@@ -293,11 +294,17 @@ func (ms *ManagedStream) Close() error {
293
294
}
294
295
295
296
// AppendRows sends the append requests to the service, and returns one AppendResult per row.
296
- func (ms * ManagedStream ) AppendRows (data [][]byte , offset int64 ) ([]* AppendResult , error ) {
297
+ func (ms * ManagedStream ) AppendRows (ctx context. Context , data [][]byte , offset int64 ) ([]* AppendResult , error ) {
297
298
pw := newPendingWrite (data , offset )
299
+ // check flow control
300
+ if err := ms .fc .acquire (ctx , pw .reqSize ); err != nil {
301
+ // in this case, we didn't acquire, so don't pass the flow controller reference to avoid a release.
302
+ pw .markDone (NoStreamOffset , err , nil )
303
+ }
304
+ // proceed to call
298
305
if err := ms .append (pw ); err != nil {
299
- // pending write is DOA, mark it done .
300
- pw .markDone (NoStreamOffset , err )
306
+ // pending write is DOA.
307
+ pw .markDone (NoStreamOffset , err , ms . fc )
301
308
return nil , err
302
309
}
303
310
return pw .results , nil
@@ -307,7 +314,7 @@ func (ms *ManagedStream) AppendRows(data [][]byte, offset int64) ([]*AppendResul
307
314
//
308
315
// The receive processor only deals with a single instance of a connection/channel, and thus should never interact
309
316
// with the mutex lock.
310
- func recvProcessor (ctx context.Context , arc storagepb.BigQueryWrite_AppendRowsClient , ch <- chan * pendingWrite ) {
317
+ func recvProcessor (ctx context.Context , arc storagepb.BigQueryWrite_AppendRowsClient , fc * flowController , ch <- chan * pendingWrite ) {
311
318
// TODO: We'd like to re-send requests that are in an ambiguous state due to channel errors. For now, we simply
312
319
// ensure that pending writes get acknowledged with a terminal state.
313
320
for {
@@ -319,7 +326,7 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl
319
326
if ! ok {
320
327
return
321
328
}
322
- pw .markDone (NoStreamOffset , ctx .Err ())
329
+ pw .markDone (NoStreamOffset , ctx .Err (), fc )
323
330
}
324
331
case nextWrite , ok := <- ch :
325
332
if ! ok {
@@ -330,19 +337,20 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl
330
337
// block until we get a corresponding response or err from stream.
331
338
resp , err := arc .Recv ()
332
339
if err != nil {
333
- nextWrite .markDone (NoStreamOffset , err )
340
+ nextWrite .markDone (NoStreamOffset , err , fc )
334
341
}
335
342
336
343
if status := resp .GetError (); status != nil {
337
- nextWrite .markDone (NoStreamOffset , grpcstatus .ErrorProto (status ))
344
+ fc .release (nextWrite .reqSize )
345
+ nextWrite .markDone (NoStreamOffset , grpcstatus .ErrorProto (status ), fc )
338
346
continue
339
347
}
340
348
success := resp .GetAppendResult ()
341
349
off := success .GetOffset ()
342
350
if off != nil {
343
- nextWrite .markDone (off .GetValue (), nil )
351
+ nextWrite .markDone (off .GetValue (), nil , fc )
344
352
}
345
- nextWrite .markDone (NoStreamOffset , nil )
353
+ nextWrite .markDone (NoStreamOffset , nil , fc )
346
354
}
347
355
}
348
356
}
0 commit comments