Skip to content

Commit 014b314

Browse files
authored
feat(bigquery/storage/managedwriter): support variadic appends (#5102)
* feat(bigquery/storage/managedwriter): support variadic AppendOption
1 parent c9cd984 commit 014b314

9 files changed

+362
-122
lines changed

Diff for: bigquery/storage/managedwriter/appendresult.go

+25-5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919

2020
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
2121
"google.golang.org/protobuf/proto"
22+
"google.golang.org/protobuf/types/descriptorpb"
2223
"google.golang.org/protobuf/types/known/wrapperspb"
2324
)
2425

@@ -66,7 +67,9 @@ func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) {
6667
// append request.
6768
type pendingWrite struct {
6869
request *storagepb.AppendRowsRequest
69-
result *AppendResult
70+
// for schema evolution cases, accept a new schema
71+
newSchema *descriptorpb.DescriptorProto
72+
result *AppendResult
7073

7174
// this is used by the flow controller.
7275
reqSize int
@@ -77,7 +80,7 @@ type pendingWrite struct {
7780
// that in the future, we may want to allow row batching to be managed by
7881
// the server (e.g. for default/COMMITTED streams). For BUFFERED/PENDING
7982
// streams, this should be managed by the user.
80-
func newPendingWrite(appends [][]byte, offset int64) *pendingWrite {
83+
func newPendingWrite(appends [][]byte) *pendingWrite {
8184
pw := &pendingWrite{
8285
request: &storagepb.AppendRowsRequest{
8386
Rows: &storagepb.AppendRowsRequest_ProtoRows{
@@ -90,9 +93,6 @@ func newPendingWrite(appends [][]byte, offset int64) *pendingWrite {
9093
},
9194
result: newAppendResult(appends),
9295
}
93-
if offset > 0 {
94-
pw.request.Offset = &wrapperspb.Int64Value{Value: offset}
95-
}
9696
// We compute the size now for flow controller purposes, though
9797
// the actual request size may be slightly larger (e.g. the first
9898
// request in a new stream bears schema and stream id).
@@ -114,3 +114,23 @@ func (pw *pendingWrite) markDone(startOffset int64, err error, fc *flowControlle
114114
fc.release(pw.reqSize)
115115
}
116116
}
117+
118+
// AppendOption are options that can be passed when appending data with a managed stream instance.
119+
type AppendOption func(*pendingWrite)
120+
121+
// UpdateSchemaDescriptor is used to update the descriptor message schema associated
122+
// with a given stream.
123+
func UpdateSchemaDescriptor(schema *descriptorpb.DescriptorProto) AppendOption {
124+
return func(pw *pendingWrite) {
125+
pw.newSchema = schema
126+
}
127+
}
128+
129+
// WithOffset sets an explicit offset value for this append request.
130+
func WithOffset(offset int64) AppendOption {
131+
return func(pw *pendingWrite) {
132+
pw.request.Offset = &wrapperspb.Int64Value{
133+
Value: offset,
134+
}
135+
}
136+
}

Diff for: bigquery/storage/managedwriter/appendresult_test.go

+32-28
Original file line numberDiff line numberDiff line change
@@ -43,38 +43,33 @@ func TestPendingWrite(t *testing.T) {
4343
[]byte("row3"),
4444
}
4545

46-
var wantOffset int64 = 99
47-
48-
// first, verify no offset behavior
49-
pending := newPendingWrite(wantRowData, NoStreamOffset)
46+
// verify no offset behavior
47+
pending := newPendingWrite(wantRowData)
5048
if pending.request.GetOffset() != nil {
5149
t.Errorf("request should have no offset, but is present: %q", pending.request.GetOffset().GetValue())
5250
}
53-
pending.markDone(NoStreamOffset, nil, nil)
54-
if pending.result.offset != NoStreamOffset {
55-
t.Errorf("mismatch on completed AppendResult without offset: got %d want %d", pending.result.offset, NoStreamOffset)
56-
}
57-
if pending.result.err != nil {
58-
t.Errorf("mismatch in error on AppendResult, got %v want nil", pending.result.err)
59-
}
60-
61-
// now, verify behavior with a valid offset
62-
pending = newPendingWrite(wantRowData, 99)
63-
if pending.request.GetOffset() == nil {
64-
t.Errorf("offset not set, should be %d", wantOffset)
65-
}
66-
if gotOffset := pending.request.GetOffset().GetValue(); gotOffset != wantOffset {
67-
t.Errorf("offset mismatch, got %d want %d", gotOffset, wantOffset)
68-
}
6951

70-
// check request shape
7152
gotRowCount := len(pending.request.GetProtoRows().GetRows().GetSerializedRows())
7253
if gotRowCount != len(wantRowData) {
7354
t.Errorf("pendingWrite request mismatch, got %d rows, want %d rows", gotRowCount, len(wantRowData))
7455
}
7556

76-
// verify AppendResult
57+
// Verify request is not acknowledged.
58+
select {
59+
case <-pending.result.Ready():
60+
t.Errorf("got Ready() on incomplete AppendResult")
61+
case <-time.After(100 * time.Millisecond):
62+
63+
}
7764

65+
// Mark completed, verify result.
66+
pending.markDone(NoStreamOffset, nil, nil)
67+
if pending.result.offset != NoStreamOffset {
68+
t.Errorf("mismatch on completed AppendResult without offset: got %d want %d", pending.result.offset, NoStreamOffset)
69+
}
70+
if pending.result.err != nil {
71+
t.Errorf("mismatch in error on AppendResult, got %v want nil", pending.result.err)
72+
}
7873
gotData := pending.result.rowData
7974
if len(gotData) != len(wantRowData) {
8075
t.Errorf("length mismatch on appendresult, got %d, want %d", len(gotData), len(wantRowData))
@@ -84,15 +79,24 @@ func TestPendingWrite(t *testing.T) {
8479
t.Errorf("row %d mismatch in data: got %q want %q", i, gotData[i], wantRowData[i])
8580
}
8681
}
87-
select {
88-
case <-pending.result.Ready():
89-
t.Errorf("got Ready() on incomplete AppendResult")
90-
case <-time.After(100 * time.Millisecond):
9182

92-
}
83+
// Create new write to verify error result.
84+
pending = newPendingWrite(wantRowData)
9385

94-
// verify completion behavior
86+
// Manually invoke option to apply offset to request.
87+
// This would normally be appied as part of the AppendRows() method on the managed stream.
9588
reportedOffset := int64(101)
89+
f := WithOffset(reportedOffset)
90+
f(pending)
91+
92+
if pending.request.GetOffset() == nil {
93+
t.Errorf("expected offset, got none")
94+
}
95+
if pending.request.GetOffset().GetValue() != reportedOffset {
96+
t.Errorf("offset mismatch, got %d wanted %d", pending.request.GetOffset().GetValue(), reportedOffset)
97+
}
98+
99+
// Verify completion behavior with an error.
96100
wantErr := fmt.Errorf("foo")
97101
pending.markDone(reportedOffset, wantErr, nil)
98102

Diff for: bigquery/storage/managedwriter/integration_test.go

+99-9
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ func TestIntegration_ManagedWriter(t *testing.T) {
144144
t.Parallel()
145145
testPendingStream(ctx, t, mwClient, bqClient, dataset)
146146
})
147+
t.Run("SchemaEvolution", func(t *testing.T) {
148+
t.Parallel()
149+
testSchemaEvolution(ctx, t, mwClient, bqClient, dataset)
150+
})
147151
t.Run("Instrumentation", func(t *testing.T) {
148152
// Don't run this in parallel, we only want to collect stats from this subtest.
149153
testInstrumentation(ctx, t, mwClient, bqClient, dataset)
@@ -181,7 +185,7 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
181185
t.Errorf("failed to marshal message %d: %v", k, err)
182186
}
183187
data := [][]byte{b}
184-
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
188+
result, err = ms.AppendRows(ctx, data)
185189
if err != nil {
186190
t.Errorf("single-row append %d failed: %v", k, err)
187191
}
@@ -201,7 +205,7 @@ func testDefaultStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
201205
}
202206
data = append(data, b)
203207
}
204-
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
208+
result, err = ms.AppendRows(ctx, data)
205209
if err != nil {
206210
t.Errorf("grouped-row append failed: %v", err)
207211
}
@@ -256,7 +260,7 @@ func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *C
256260
if err != nil {
257261
t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err)
258262
}
259-
result, err = ms.AppendRows(ctx, [][]byte{b}, NoStreamOffset)
263+
result, err = ms.AppendRows(ctx, [][]byte{b})
260264
if err != nil {
261265
t.Errorf("single-row append %d failed: %v", k, err)
262266
}
@@ -305,7 +309,7 @@ func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqC
305309
t.Errorf("failed to marshal message %d: %v", k, err)
306310
}
307311
data := [][]byte{b}
308-
results, err := ms.AppendRows(ctx, data, NoStreamOffset)
312+
results, err := ms.AppendRows(ctx, data)
309313
if err != nil {
310314
t.Errorf("single-row append %d failed: %v", k, err)
311315
}
@@ -358,7 +362,7 @@ func testCommittedStream(ctx context.Context, t *testing.T, mwClient *Client, bq
358362
t.Errorf("failed to marshal message %d: %v", k, err)
359363
}
360364
data := [][]byte{b}
361-
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
365+
result, err = ms.AppendRows(ctx, data, WithOffset(int64(k)))
362366
if err != nil {
363367
t.Errorf("single-row append %d failed: %v", k, err)
364368
}
@@ -397,12 +401,19 @@ func testPendingStream(ctx context.Context, t *testing.T, mwClient *Client, bqCl
397401
t.Errorf("failed to marshal message %d: %v", k, err)
398402
}
399403
data := [][]byte{b}
400-
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
404+
result, err = ms.AppendRows(ctx, data, WithOffset(int64(k)))
401405
if err != nil {
402406
t.Errorf("single-row append %d failed: %v", k, err)
403407
}
408+
// be explicit about waiting/checking each response.
409+
off, err := result.GetResult(ctx)
410+
if err != nil {
411+
t.Errorf("response %d error: %v", k, err)
412+
}
413+
if off != int64(k) {
414+
t.Errorf("offset mismatch, got %d want %d", off, k)
415+
}
404416
}
405-
result.Ready()
406417
wantRows := int64(len(testSimpleData))
407418

408419
// Mark stream complete.
@@ -468,7 +479,7 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq
468479
t.Errorf("failed to marshal message %d: %v", k, err)
469480
}
470481
data := [][]byte{b}
471-
result, err = ms.AppendRows(ctx, data, NoStreamOffset)
482+
result, err = ms.AppendRows(ctx, data)
472483
if err != nil {
473484
t.Errorf("single-row append %d failed: %v", k, err)
474485
}
@@ -513,6 +524,85 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq
513524
}
514525
}
515526

527+
func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
528+
testTable := dataset.Table(tableIDs.New())
529+
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
530+
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
531+
}
532+
533+
m := &testdata.SimpleMessageProto2{}
534+
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
535+
536+
// setup a new stream.
537+
ms, err := mwClient.NewManagedStream(ctx,
538+
WithDestinationTable(fmt.Sprintf("projects/%s/datasets/%s/tables/%s", testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
539+
WithType(CommittedStream),
540+
WithSchemaDescriptor(descriptorProto),
541+
)
542+
if err != nil {
543+
t.Fatalf("NewManagedStream: %v", err)
544+
}
545+
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
546+
withExactRowCount(0))
547+
548+
var result *AppendResult
549+
for k, mesg := range testSimpleData {
550+
b, err := proto.Marshal(mesg)
551+
if err != nil {
552+
t.Errorf("failed to marshal message %d: %v", k, err)
553+
}
554+
data := [][]byte{b}
555+
result, err = ms.AppendRows(ctx, data)
556+
if err != nil {
557+
t.Errorf("single-row append %d failed: %v", k, err)
558+
}
559+
}
560+
// wait for the result to indicate ready, then validate.
561+
_, err = result.GetResult(ctx)
562+
if err != nil {
563+
t.Errorf("error on append: %v", err)
564+
}
565+
566+
validateTableConstraints(ctx, t, bqClient, testTable, "after send",
567+
withExactRowCount(int64(len(testSimpleData))))
568+
569+
// Now, evolve the underlying table schema.
570+
_, err = testTable.Update(ctx, bigquery.TableMetadataToUpdate{Schema: testdata.SimpleMessageEvolvedSchema}, "")
571+
if err != nil {
572+
t.Errorf("failed to evolve table schema: %v", err)
573+
}
574+
575+
// TODO: we need a more elegant mechanism for detecting when the backend has registered the schema change.
576+
// In the continuous case, we'd get it from the response, but the change-and-wait case needs something more.
577+
time.Sleep(6 * time.Second)
578+
579+
// ready descriptor, send an additional append
580+
m2 := &testdata.SimpleMessageEvolvedProto2{
581+
Name: proto.String("evolved"),
582+
Value: proto.Int64(180),
583+
Other: proto.String("hello evolution"),
584+
}
585+
descriptorProto = protodesc.ToDescriptorProto(m2.ProtoReflect().Descriptor())
586+
b, err := proto.Marshal(m2)
587+
if err != nil {
588+
t.Errorf("failed to marshal evolved message: %v", err)
589+
}
590+
result, err = ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto))
591+
if err != nil {
592+
t.Errorf("failed evolved append: %v", err)
593+
}
594+
_, err = result.GetResult(ctx)
595+
if err != nil {
596+
t.Errorf("error on evolved append: %v", err)
597+
}
598+
599+
validateTableConstraints(ctx, t, bqClient, testTable, "after send",
600+
withExactRowCount(int64(len(testSimpleData)+1)),
601+
withNullCount("name", 0),
602+
withNonNullCount("other", 1),
603+
)
604+
}
605+
516606
func TestIntegration_DetectProjectID(t *testing.T) {
517607
ctx := context.Background()
518608
testCreds := testutil.Credentials(ctx)
@@ -613,7 +703,7 @@ func testProtoNormalization(ctx context.Context, t *testing.T, mwClient *Client,
613703
if err != nil {
614704
t.Fatalf("NewManagedStream: %v", err)
615705
}
616-
result, err := ms.AppendRows(ctx, [][]byte{sampleRow}, NoStreamOffset)
706+
result, err := ms.AppendRows(ctx, [][]byte{sampleRow})
617707
if err != nil {
618708
t.Errorf("append failed: %v", err)
619709
}

0 commit comments

Comments
 (0)