Skip to content

Commit 70e40db

Browse files
authored
feat(bigquery/storage/managedwriter): support schema change notification (#5253)
* feat(bigquery/storage/managedwriter): support schema change notification This adds a way for user to get schema changes by checking AppendResult.
1 parent c276428 commit 70e40db

File tree

3 files changed

+51
-6
lines changed

3 files changed

+51
-6
lines changed

Diff for: bigquery/storage/managedwriter/appendresult.go

+15
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package managedwriter
1616

1717
import (
1818
"context"
19+
"fmt"
1920

2021
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
2122
"google.golang.org/protobuf/proto"
@@ -38,6 +39,9 @@ type AppendResult struct {
3839

3940
// the stream offset
4041
offset int64
42+
43+
// retains the updated schema from backend response. Used for schema change notification.
44+
updatedSchema *storagepb.TableSchema
4145
}
4246

4347
func newAppendResult(data [][]byte) *AppendResult {
@@ -62,6 +66,17 @@ func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) {
6266
}
6367
}
6468

69+
// UpdatedSchema returns the updated schema for a table if supplied by the backend as part
70+
// of the append response. It blocks until the result is ready.
71+
func (ar *AppendResult) UpdatedSchema(ctx context.Context) (*storagepb.TableSchema, error) {
72+
select {
73+
case <-ctx.Done():
74+
return nil, fmt.Errorf("context done")
75+
case <-ar.Ready():
76+
return ar.updatedSchema, nil
77+
}
78+
}
79+
6580
// pendingWrite tracks state for a set of rows that are part of a single
6681
// append request.
6782
type pendingWrite struct {

Diff for: bigquery/storage/managedwriter/integration_test.go

+31-6
Original file line numberDiff line numberDiff line change
@@ -546,16 +546,20 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
546546
withExactRowCount(0))
547547

548548
var result *AppendResult
549+
var curOffset int64
550+
var latestRow []byte
549551
for k, mesg := range testSimpleData {
550552
b, err := proto.Marshal(mesg)
551553
if err != nil {
552554
t.Errorf("failed to marshal message %d: %v", k, err)
553555
}
556+
latestRow = b
554557
data := [][]byte{b}
555-
result, err = ms.AppendRows(ctx, data)
558+
result, err = ms.AppendRows(ctx, data, WithOffset(curOffset))
556559
if err != nil {
557560
t.Errorf("single-row append %d failed: %v", k, err)
558561
}
562+
curOffset = curOffset + int64(len(data))
559563
}
560564
// wait for the result to indicate ready, then validate.
561565
_, err = result.GetResult(ctx)
@@ -572,9 +576,30 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
572576
t.Errorf("failed to evolve table schema: %v", err)
573577
}
574578

575-
// TODO: we need a more elegant mechanism for detecting when the backend has registered the schema change.
576-
// In the continuous case, we'd get it from the response, but the change-and-wait case needs something more.
577-
time.Sleep(6 * time.Second)
579+
// Resend latest row until we get a new schema notification.
580+
// It _should_ be possible to send duplicates, but this currently will not propagate the schema error.
581+
// Internal issue: b/211899346
582+
for {
583+
resp, err := ms.AppendRows(ctx, [][]byte{latestRow}, WithOffset(curOffset))
584+
if err != nil {
585+
t.Errorf("got error on dupe append: %v", err)
586+
break
587+
}
588+
curOffset = curOffset + 1
589+
if err != nil {
590+
t.Errorf("got error on offset %d: %v", curOffset, err)
591+
break
592+
}
593+
s, err := resp.UpdatedSchema(ctx)
594+
if err != nil {
595+
t.Errorf("getting schema error: %v", err)
596+
break
597+
}
598+
if s != nil {
599+
break
600+
}
601+
602+
}
578603

579604
// ready descriptor, send an additional append
580605
m2 := &testdata.SimpleMessageEvolvedProto2{
@@ -587,7 +612,7 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
587612
if err != nil {
588613
t.Errorf("failed to marshal evolved message: %v", err)
589614
}
590-
result, err = ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto))
615+
result, err = ms.AppendRows(ctx, [][]byte{b}, UpdateSchemaDescriptor(descriptorProto), WithOffset(curOffset))
591616
if err != nil {
592617
t.Errorf("failed evolved append: %v", err)
593618
}
@@ -597,7 +622,7 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
597622
}
598623

599624
validateTableConstraints(ctx, t, bqClient, testTable, "after send",
600-
withExactRowCount(int64(len(testSimpleData)+1)),
625+
withExactRowCount(int64(curOffset+1)),
601626
withNullCount("name", 0),
602627
withNonNullCount("other", 1),
603628
)

Diff for: bigquery/storage/managedwriter/managed_stream.go

+5
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,11 @@ func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsCl
400400
}
401401
recordStat(ctx, AppendResponses, 1)
402402

403+
// Retain the updated schema if present, for eventual presentation to the user.
404+
if resp.GetUpdatedSchema() != nil {
405+
nextWrite.result.updatedSchema = resp.GetUpdatedSchema()
406+
}
407+
403408
if status := resp.GetError(); status != nil {
404409
tagCtx, _ := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String()))
405410
if err != nil {

0 commit comments

Comments
 (0)