Skip to content

Commit b085384

Browse files
authored
feat(bigquery/storage/managedwriter): add append stream plumbing (#4452)
This PR adds enough of the wiring to the client to being testing via integration tests. It adapts a similar pattern to the pullstream in pubsub, in that it abstracts individual calls from stream state management. There's two significant units of future work that may yield changes here: * For traffic efficiency sake, we only want to add things like the stream ID, schema, and trace ID to the first append on any stream. * For stream connection retry, we may want to re-send writes that were sent but we didn't get an acknowledgement back. For default/committed streams, this behavior may yield additional writes (at least once semantics). For buffered/pending streams, it means either the library or user should know to expect "data already present" for these resent-writes. Towards #4366
1 parent e5019de commit b085384

10 files changed

+910
-21
lines changed

Diff for: bigquery/storage/managedwriter/client.go

+52-19
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import (
2121
"strings"
2222

2323
storage "cloud.google.com/go/bigquery/storage/apiv1beta2"
24+
"github.com/googleapis/gax-go/v2"
2425
"google.golang.org/api/option"
2526
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
27+
"google.golang.org/grpc"
2628
)
2729

2830
// Client is a managed BigQuery Storage write client scoped to a single project.
@@ -53,41 +55,72 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
5355
}, nil
5456
}
5557

58+
// Close releases resources held by the client.
59+
func (c *Client) Close() error {
60+
// TODO: consider if we should propagate a cancellation from client to all associated managed streams.
61+
if c.rawClient == nil {
62+
return fmt.Errorf("already closed")
63+
}
64+
c.rawClient.Close()
65+
c.rawClient = nil
66+
return nil
67+
}
68+
5669
// NewManagedStream establishes a new managed stream for appending data into a table.
70+
//
71+
// Context here is retained for use by the underlying streaming connections the managed stream may create.
5772
func (c *Client) NewManagedStream(ctx context.Context, opts ...WriterOption) (*ManagedStream, error) {
73+
return c.buildManagedStream(ctx, c.rawClient.AppendRows, false, opts...)
74+
}
75+
76+
func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClientFunc, skipSetup bool, opts ...WriterOption) (*ManagedStream, error) {
77+
78+
ctx, cancel := context.WithCancel(ctx)
5879

5980
ms := &ManagedStream{
6081
streamSettings: defaultStreamSettings(),
6182
c: c,
83+
ctx: ctx,
84+
cancel: cancel,
85+
open: func() (storagepb.BigQueryWrite_AppendRowsClient, error) {
86+
arc, err := streamFunc(ctx, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10*1024*1024)))
87+
if err != nil {
88+
return nil, err
89+
}
90+
return arc, nil
91+
},
6292
}
6393

6494
// apply writer options
6595
for _, opt := range opts {
6696
opt(ms)
6797
}
6898

69-
if err := c.validateOptions(ctx, ms); err != nil {
70-
return nil, err
71-
}
99+
// skipSetup exists for testing scenarios.
100+
if !skipSetup {
101+
if err := c.validateOptions(ctx, ms); err != nil {
102+
return nil, err
103+
}
72104

73-
if ms.streamSettings.streamID == "" {
74-
// not instantiated with a stream, construct one.
75-
streamName := fmt.Sprintf("%s/_default", ms.destinationTable)
76-
if ms.streamSettings.streamType != DefaultStream {
77-
// For everything but a default stream, we create a new stream on behalf of the user.
78-
req := &storagepb.CreateWriteStreamRequest{
79-
Parent: ms.destinationTable,
80-
WriteStream: &storagepb.WriteStream{
81-
Type: streamTypeToEnum(ms.streamSettings.streamType),
82-
}}
83-
resp, err := ms.c.rawClient.CreateWriteStream(ctx, req)
84-
if err != nil {
85-
return nil, fmt.Errorf("couldn't create write stream: %v", err)
105+
if ms.streamSettings.streamID == "" {
106+
// not instantiated with a stream, construct one.
107+
streamName := fmt.Sprintf("%s/_default", ms.destinationTable)
108+
if ms.streamSettings.streamType != DefaultStream {
109+
// For everything but a default stream, we create a new stream on behalf of the user.
110+
req := &storagepb.CreateWriteStreamRequest{
111+
Parent: ms.destinationTable,
112+
WriteStream: &storagepb.WriteStream{
113+
Type: streamTypeToEnum(ms.streamSettings.streamType),
114+
}}
115+
resp, err := ms.c.rawClient.CreateWriteStream(ctx, req)
116+
if err != nil {
117+
return nil, fmt.Errorf("couldn't create write stream: %v", err)
118+
}
119+
streamName = resp.GetName()
86120
}
87-
streamName = resp.GetName()
121+
ms.streamSettings.streamID = streamName
122+
// TODO(followup CLs): instantiate an appendstream client, flow controller, etc.
88123
}
89-
ms.streamSettings.streamID = streamName
90-
// TODO(followup CLs): instantiate an appendstream client, flow controller, etc.
91124
}
92125

93126
return ms, nil

Diff for: bigquery/storage/managedwriter/doc.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414

1515
// Package managedwriter will be a thick client around the storage API's BigQueryWriteClient.
1616
//
17-
// It is EXPERIMENTAL and subject to change or removal without notice.
17+
// It is EXPERIMENTAL and subject to change or removal without notice. This library is in a pre-alpha
18+
// state, and breaking changes are frequent.
1819
//
1920
// Currently, the BigQueryWriteClient this library targets is exposed in the storage v1beta2 endpoint, and is
2021
// a successor to the streaming interface. API method tabledata.insertAll is the primary backend method, and

Diff for: bigquery/storage/managedwriter/integration_test.go

+207
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
// Copyright 2021 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package managedwriter
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"testing"
21+
"time"
22+
23+
"cloud.google.com/go/bigquery"
24+
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
25+
"cloud.google.com/go/bigquery/storage/managedwriter/testdata"
26+
"cloud.google.com/go/internal/testutil"
27+
"cloud.google.com/go/internal/uid"
28+
"google.golang.org/api/option"
29+
"google.golang.org/protobuf/proto"
30+
"google.golang.org/protobuf/reflect/protodesc"
31+
"google.golang.org/protobuf/reflect/protoreflect"
32+
"google.golang.org/protobuf/types/descriptorpb"
33+
)
34+
35+
var (
36+
datasetIDs = uid.NewSpace("managedwriter_test_dataset", &uid.Options{Sep: '_', Time: time.Now()})
37+
tableIDs = uid.NewSpace("table", &uid.Options{Sep: '_', Time: time.Now()})
38+
defaultTestTimeout = 15 * time.Second
39+
)
40+
41+
func getTestClients(ctx context.Context, t *testing.T, opts ...option.ClientOption) (*Client, *bigquery.Client) {
42+
if testing.Short() {
43+
t.Skip("Integration tests skipped in short mode")
44+
}
45+
projID := testutil.ProjID()
46+
if projID == "" {
47+
t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
48+
}
49+
ts := testutil.TokenSource(ctx, "https://www.googleapis.com/auth/bigquery")
50+
if ts == nil {
51+
t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
52+
}
53+
opts = append(opts, option.WithTokenSource(ts))
54+
client, err := NewClient(ctx, projID, opts...)
55+
if err != nil {
56+
t.Fatalf("couldn't create managedwriter client: %v", err)
57+
}
58+
59+
bqClient, err := bigquery.NewClient(ctx, projID, opts...)
60+
if err != nil {
61+
t.Fatalf("couldn't create bigquery client: %v", err)
62+
}
63+
return client, bqClient
64+
}
65+
66+
// validateRowCount confirms the number of rows in a table visible to the query engine.
67+
func validateRowCount(ctx context.Context, t *testing.T, client *bigquery.Client, tbl *bigquery.Table, expectedRows int64) {
68+
69+
// Verify data is present in the table with a count query.
70+
sql := fmt.Sprintf("SELECT COUNT(1) FROM `%s`.%s.%s", tbl.ProjectID, tbl.DatasetID, tbl.TableID)
71+
q := client.Query(sql)
72+
it, err := q.Read(ctx)
73+
if err != nil {
74+
t.Errorf("failed to issue validation query: %v", err)
75+
return
76+
}
77+
var rowdata []bigquery.Value
78+
err = it.Next(&rowdata)
79+
if err != nil {
80+
t.Errorf("error fetching validation results: %v", err)
81+
return
82+
}
83+
count, ok := rowdata[0].(int64)
84+
if !ok {
85+
t.Errorf("got unexpected data from validation query: %v", rowdata[0])
86+
}
87+
if count != expectedRows {
88+
t.Errorf("rows mismatch expected rows: got %d want %d", count, expectedRows)
89+
}
90+
}
91+
92+
// setupTestDataset generates a unique dataset for testing, and a cleanup that can be deferred.
93+
func setupTestDataset(ctx context.Context, t *testing.T, bqc *bigquery.Client) (ds *bigquery.Dataset, cleanup func(), err error) {
94+
dataset := bqc.Dataset(datasetIDs.New())
95+
if err := dataset.Create(ctx, nil); err != nil {
96+
return nil, nil, err
97+
}
98+
return dataset, func() {
99+
if err := dataset.DeleteWithContents(ctx); err != nil {
100+
t.Logf("could not cleanup dataset %s: %v", dataset.DatasetID, err)
101+
}
102+
}, nil
103+
}
104+
105+
// setupDynamicDescriptors aids testing when not using a supplied proto
106+
func setupDynamicDescriptors(t *testing.T, schema bigquery.Schema) (protoreflect.MessageDescriptor, *descriptorpb.DescriptorProto) {
107+
convertedSchema, err := adapt.BQSchemaToStorageTableSchema(schema)
108+
if err != nil {
109+
t.Fatalf("adapt.BQSchemaToStorageTableSchema: %v", err)
110+
}
111+
112+
descriptor, err := adapt.StorageSchemaToDescriptor(convertedSchema, "root")
113+
if err != nil {
114+
t.Fatalf("adapt.StorageSchemaToDescriptor: %v", err)
115+
}
116+
messageDescriptor, ok := descriptor.(protoreflect.MessageDescriptor)
117+
if !ok {
118+
t.Fatalf("adapted descriptor is not a message descriptor")
119+
}
120+
return messageDescriptor, protodesc.ToDescriptorProto(messageDescriptor)
121+
}
122+
123+
func TestIntegration_ManagedWriter_BasicOperation(t *testing.T) {
124+
mwClient, bqClient := getTestClients(context.Background(), t)
125+
defer mwClient.Close()
126+
defer bqClient.Close()
127+
128+
dataset, cleanup, err := setupTestDataset(context.Background(), t, bqClient)
129+
if err != nil {
130+
t.Fatalf("failed to init test dataset: %v", err)
131+
}
132+
defer cleanup()
133+
134+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
135+
defer cancel()
136+
137+
// prep a suitable destination table.
138+
testTable := dataset.Table(tableIDs.New())
139+
schema := bigquery.Schema{
140+
{Name: "name", Type: bigquery.StringFieldType, Required: true},
141+
{Name: "value", Type: bigquery.IntegerFieldType, Required: true},
142+
}
143+
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil {
144+
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
145+
}
146+
// We'll use a precompiled test proto, but we need it's corresponding descriptorproto representation
147+
// to send as the stream's schema.
148+
m := &testdata.SimpleMessage{}
149+
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
150+
151+
// setup a new stream.
152+
ms, err := mwClient.NewManagedStream(ctx,
153+
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
154+
WithType(DefaultStream),
155+
WithSchemaDescriptor(descriptorProto),
156+
)
157+
if err != nil {
158+
t.Fatalf("NewManagedStream: %v", err)
159+
}
160+
161+
// prevalidate we have no data in table.
162+
validateRowCount(ctx, t, bqClient, testTable, 0)
163+
164+
testData := []*testdata.SimpleMessage{
165+
{Name: "one", Value: 1},
166+
{Name: "two", Value: 2},
167+
{Name: "three", Value: 3},
168+
{Name: "four", Value: 1},
169+
{Name: "five", Value: 2},
170+
}
171+
172+
// First, send the test rows individually.
173+
var results []*AppendResult
174+
for k, mesg := range testData {
175+
b, err := proto.Marshal(mesg)
176+
if err != nil {
177+
t.Errorf("failed to marshal message %d: %v", k, err)
178+
}
179+
data := [][]byte{b}
180+
results, err = ms.AppendRows(data, NoStreamOffset)
181+
if err != nil {
182+
t.Errorf("single-row append %d failed: %v", k, err)
183+
}
184+
}
185+
// wait for the result to indicate ready, then validate.
186+
results[0].Ready()
187+
wantRows := int64(len(testData))
188+
validateRowCount(ctx, t, bqClient, testTable, wantRows)
189+
190+
// Now, send the test rows grouped into in a single append.
191+
var data [][]byte
192+
for k, mesg := range testData {
193+
b, err := proto.Marshal(mesg)
194+
if err != nil {
195+
t.Errorf("failed to marshal message %d: %v", k, err)
196+
}
197+
data := append(data, b)
198+
results, err = ms.AppendRows(data, NoStreamOffset)
199+
if err != nil {
200+
t.Errorf("grouped-row append failed: %v", err)
201+
}
202+
}
203+
// wait for the result to indicate ready, then validate again.
204+
results[0].Ready()
205+
wantRows = wantRows * 2
206+
validateRowCount(ctx, t, bqClient, testTable, wantRows)
207+
}

0 commit comments

Comments
 (0)