Skip to content

Commit 68418f9

Browse files
authored
fix(bigquery/storage/managedwriter): support non-default regions (#4566)
Additional internal context: b/185842996 Request routing relies on a metadata header being present, but because of the bidi nature library generators don't automatically attach the write stream metadata into x-goog-request-headers. For this API, the stream ID is constant for the whole stream so we inject it when opening the stream, which is when routing needs the information. This causes some minor changes to how we do stream (re)open because we need to pass in the stream ID as part of the function. This change also updates integration testing so that we're testing in an explicit, non-default region (us-east1). Towards: #4366
1 parent 381a494 commit 68418f9

File tree

4 files changed

+17
-9
lines changed

4 files changed

+17
-9
lines changed

Diff for: bigquery/storage/managedwriter/client.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"google.golang.org/api/option"
2626
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
2727
"google.golang.org/grpc"
28+
"google.golang.org/grpc/metadata"
2829
)
2930

3031
// Client is a managed BigQuery Storage write client scoped to a single project.
@@ -81,8 +82,11 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
8182
c: c,
8283
ctx: ctx,
8384
cancel: cancel,
84-
open: func() (storagepb.BigQueryWrite_AppendRowsClient, error) {
85-
arc, err := streamFunc(ctx, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10*1024*1024)))
85+
open: func(streamID string) (storagepb.BigQueryWrite_AppendRowsClient, error) {
86+
arc, err := streamFunc(
87+
// Bidi Streaming doesn't append stream ID as request metadata, so we must inject it manually.
88+
metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_stream=%s", streamID)),
89+
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10*1024*1024)))
8690
if err != nil {
8791
return nil, err
8892
}

Diff for: bigquery/storage/managedwriter/integration_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,9 @@ func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOpti
8282
}
8383

8484
// setupTestDataset generates a unique dataset for testing, and a cleanup that can be deferred.
85-
func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client) (ds *bigquery.Dataset, cleanup func(), err error) {
85+
func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client, location string) (ds *bigquery.Dataset, cleanup func(), err error) {
8686
dataset := bqc.Dataset(datasetIDs.New())
87-
if err := dataset.Create(ctx, nil); err != nil {
87+
if err := dataset.Create(ctx, &bigquery.DatasetMetadata{Location: location}); err != nil {
8888
return nil, nil, err
8989
}
9090
return dataset, func() {
@@ -117,7 +117,7 @@ func TestIntegration_ManagedWriter(t *testing.T) {
117117
defer mwClient.Close()
118118
defer bqClient.Close()
119119

120-
dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient)
120+
dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient, "us-east1")
121121
if err != nil {
122122
t.Fatalf("failed to init test dataset: %v", err)
123123
}

Diff for: bigquery/storage/managedwriter/managed_stream.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ type ManagedStream struct {
7979
// aspects of the stream client
8080
ctx context.Context // retained context for the stream
8181
cancel context.CancelFunc
82-
open func() (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection
82+
open func(streamID string) (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection
8383

8484
mu sync.Mutex
8585
arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection
@@ -198,7 +198,11 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie
198198
r := defaultRetryer{}
199199
for {
200200
recordStat(ms.ctx, AppendClientOpenCount, 1)
201-
arc, err := ms.open()
201+
streamID := ""
202+
if ms.streamSettings != nil {
203+
streamID = ms.streamSettings.streamID
204+
}
205+
arc, err := ms.open(streamID)
202206
bo, shouldRetry := r.Retry(err)
203207
if err != nil && shouldRetry {
204208
recordStat(ms.ctx, AppendClientOpenRetryCount, 1)

Diff for: bigquery/storage/managedwriter/managed_stream_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func TestManagedStream_OpenWithRetry(t *testing.T) {
5555
for _, tc := range testCases {
5656
ms := &ManagedStream{
5757
ctx: context.Background(),
58-
open: func() (storagepb.BigQueryWrite_AppendRowsClient, error) {
58+
open: func(s string) (storagepb.BigQueryWrite_AppendRowsClient, error) {
5959
if len(tc.errors) == 0 {
6060
panic("out of errors")
6161
}
@@ -107,7 +107,7 @@ func TestManagedStream_FirstAppendBehavior(t *testing.T) {
107107

108108
ms := &ManagedStream{
109109
ctx: ctx,
110-
open: func() (storagepb.BigQueryWrite_AppendRowsClient, error) {
110+
open: func(s string) (storagepb.BigQueryWrite_AppendRowsClient, error) {
111111
testARC.openCount = testARC.openCount + 1
112112
return testARC, nil
113113
},

0 commit comments

Comments
 (0)