Skip to content

Commit fbc2717

Browse files
authored
feat(bigquery/storage/managedwriter): extend managedstream to support call options (#5078)
BREAKING CHANGE: changes function signatures to add variadic call options This plumbs the ability to pass gax.CallOption opts to the underlying client underpinning the ManagedStream. It also adds a WithAppendRowsCallOption option to the constructor, as well as adding direct option passing for operations like Finalize() and FlushRows(). Towards: #4366
1 parent bbf4d04 commit fbc2717

File tree

5 files changed

+44
-14
lines changed

5 files changed

+44
-14
lines changed

Diff for: bigquery/storage/managedwriter/client.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,13 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
9898
c: c,
9999
ctx: ctx,
100100
cancel: cancel,
101-
open: func(streamID string) (storagepb.BigQueryWrite_AppendRowsClient, error) {
101+
callOptions: []gax.CallOption{
102+
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)),
103+
},
104+
open: func(streamID string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
102105
arc, err := streamFunc(
103106
// Bidi Streaming doesn't append stream ID as request metadata, so we must inject it manually.
104-
metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_stream=%s", streamID)),
105-
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10*1024*1024)))
107+
metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_stream=%s", streamID)))
106108
if err != nil {
107109
return nil, err
108110
}

Diff for: bigquery/storage/managedwriter/managed_stream.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,10 @@ type ManagedStream struct {
7979
fc *flowController
8080

8181
// aspects of the stream client
82-
ctx context.Context // retained context for the stream
83-
cancel context.CancelFunc
84-
open func(streamID string) (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection
82+
ctx context.Context // retained context for the stream
83+
cancel context.CancelFunc
84+
callOptions []gax.CallOption // options passed when opening an append client
85+
open func(streamID string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection
8586

8687
mu sync.Mutex
8788
arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection
@@ -141,14 +142,14 @@ func (ms *ManagedStream) StreamType() StreamType {
141142

142143
// FlushRows advances the offset at which rows in a BufferedStream are visible. Calling
143144
// this method for other stream types yields an error.
144-
func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, error) {
145+
func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64, opts ...gax.CallOption) (int64, error) {
145146
req := &storagepb.FlushRowsRequest{
146147
WriteStream: ms.streamSettings.streamID,
147148
Offset: &wrapperspb.Int64Value{
148149
Value: offset,
149150
},
150151
}
151-
resp, err := ms.c.rawClient.FlushRows(ctx, req)
152+
resp, err := ms.c.rawClient.FlushRows(ctx, req, opts...)
152153
recordStat(ms.ctx, FlushRequests, 1)
153154
if err != nil {
154155
return 0, err
@@ -161,12 +162,12 @@ func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64) (int64, er
161162
//
162163
// Finalizing does not advance the current offset of a BufferedStream, nor does it commit
163164
// data in a PendingStream.
164-
func (ms *ManagedStream) Finalize(ctx context.Context) (int64, error) {
165+
func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) (int64, error) {
165166
// TODO: consider blocking for in-flight appends once we have an appendStream plumbed in.
166167
req := &storagepb.FinalizeWriteStreamRequest{
167168
Name: ms.streamSettings.streamID,
168169
}
169-
resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req)
170+
resp, err := ms.c.rawClient.FinalizeWriteStream(ctx, req, opts...)
170171
if err != nil {
171172
return 0, err
172173
}
@@ -208,7 +209,7 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie
208209
if ms.streamSettings != nil {
209210
streamID = ms.streamSettings.streamID
210211
}
211-
arc, err := ms.open(streamID)
212+
arc, err := ms.open(streamID, ms.callOptions...)
212213
bo, shouldRetry := r.Retry(err)
213214
if err != nil && shouldRetry {
214215
recordStat(ms.ctx, AppendClientOpenRetryCount, 1)

Diff for: bigquery/storage/managedwriter/managed_stream_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"testing"
2020

21+
"github.com/googleapis/gax-go/v2"
2122
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
2223
"google.golang.org/grpc/codes"
2324
"google.golang.org/grpc/status"
@@ -55,7 +56,7 @@ func TestManagedStream_OpenWithRetry(t *testing.T) {
5556
for _, tc := range testCases {
5657
ms := &ManagedStream{
5758
ctx: context.Background(),
58-
open: func(s string) (storagepb.BigQueryWrite_AppendRowsClient, error) {
59+
open: func(s string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
5960
if len(tc.errors) == 0 {
6061
panic("out of errors")
6162
}
@@ -107,7 +108,7 @@ func TestManagedStream_FirstAppendBehavior(t *testing.T) {
107108

108109
ms := &ManagedStream{
109110
ctx: ctx,
110-
open: func(s string) (storagepb.BigQueryWrite_AppendRowsClient, error) {
111+
open: func(s string, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
111112
testARC.openCount = testARC.openCount + 1
112113
return testARC, nil
113114
},

Diff for: bigquery/storage/managedwriter/writer_option.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414

1515
package managedwriter
1616

17-
import "google.golang.org/protobuf/types/descriptorpb"
17+
import (
18+
"github.com/googleapis/gax-go/v2"
19+
"google.golang.org/protobuf/types/descriptorpb"
20+
)
1821

1922
// WriterOption are variadic options used to configure a ManagedStream instance.
2023
type WriterOption func(*ManagedStream)
@@ -85,3 +88,11 @@ func WithDataOrigin(dataOrigin string) WriterOption {
8588
ms.streamSettings.dataOrigin = dataOrigin
8689
}
8790
}
91+
92+
// WithAppendRowsCallOption is used to supply additional call options to the ManagedStream when
93+
// it opens the underlying append stream.
94+
func WithAppendRowsCallOption(o gax.CallOption) WriterOption {
95+
return func(ms *ManagedStream) {
96+
ms.callOptions = append(ms.callOptions, o)
97+
}
98+
}

Diff for: bigquery/storage/managedwriter/writer_option_test.go

+15
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
"testing"
2020

2121
"github.com/google/go-cmp/cmp"
22+
"github.com/googleapis/gax-go/v2"
23+
"google.golang.org/grpc"
2224
)
2325

2426
func TestWriterOptions(t *testing.T) {
@@ -94,6 +96,19 @@ func TestWriterOptions(t *testing.T) {
9496
return ms
9597
}(),
9698
},
99+
{
100+
desc: "WithCallOption",
101+
options: []WriterOption{WithAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)))},
102+
want: func() *ManagedStream {
103+
ms := &ManagedStream{
104+
streamSettings: defaultStreamSettings(),
105+
callOptions: []gax.CallOption{
106+
gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)),
107+
},
108+
}
109+
return ms
110+
}(),
111+
},
97112
{
98113
desc: "multiple",
99114
options: []WriterOption{

0 commit comments

Comments
 (0)