Skip to content

Commit 4638e17

Browse files
authored
feat(bigquery/storage/managedwriter): add state tracking (#4407)
This PR introduces two new types: AppendResult - tracks the progress of an individual row append to completion, either success or error. Successful appends _may_ have an associated offset, failed appends will have an associated error. The AppendResult has a blocking method users can interrogate. pendingWrite - handles the state management for a set of rows appended as a group. There's a 1:many relationship between pendingWrite:AppendResult(s), so as a pendingWrite completes all associated AppendResult references should be updated. Towards: #4366
1 parent 528ffc9 commit 4638e17

File tree

2 files changed

+244
-0
lines changed

2 files changed

+244
-0
lines changed

Diff for: bigquery/storage/managedwriter/appendresult.go

+127
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Copyright 2021 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package managedwriter
16+
17+
import (
18+
"context"
19+
20+
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
21+
"google.golang.org/protobuf/proto"
22+
"google.golang.org/protobuf/types/known/wrapperspb"
23+
)
24+
25+
// NoStreamOffset is a sentinel value for signalling we're not tracking
26+
// stream offset (e.g. a default stream which allows simultaneous append streams).
27+
const NoStreamOffset int64 = -1
28+
29+
// AppendResult tracks the status of a single row of data.
30+
type AppendResult struct {
31+
// rowData contains the serialized row data.
32+
rowData []byte
33+
34+
ready chan struct{}
35+
36+
// if the encapsulating append failed, this will retain a reference to the error.
37+
err error
38+
39+
// the stream offset
40+
offset int64
41+
}
42+
43+
func newAppendResult(data []byte) *AppendResult {
44+
return &AppendResult{
45+
ready: make(chan struct{}),
46+
rowData: data,
47+
}
48+
}
49+
50+
// Ready blocks until the append request is completed.
51+
func (ar *AppendResult) Ready() <-chan struct{} { return ar.ready }
52+
53+
// GetResult returns the optional offset of this row, or the associated
54+
// error.
55+
func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) {
56+
select {
57+
case <-ctx.Done():
58+
return 0, ctx.Err()
59+
case <-ar.Ready():
60+
return ar.offset, ar.err
61+
}
62+
}
63+
64+
// pendingWrite tracks state for a set of rows that are part of a single
65+
// append request.
66+
type pendingWrite struct {
67+
request *storagepb.AppendRowsRequest
68+
results []*AppendResult
69+
70+
// this is used by the flow controller.
71+
reqSize int
72+
}
73+
74+
// newPendingWrite constructs the proto request and attaches references
75+
// to the pending results for later consumption. The reason for this is
76+
// that in the future, we may want to allow row batching to be managed by
77+
// the server (e.g. for default/COMMITTED streams). For BUFFERED/PENDING
78+
// streams, this should be managed by the user.
79+
func newPendingWrite(appends [][]byte, offset int64) *pendingWrite {
80+
81+
results := make([]*AppendResult, len(appends))
82+
for k, r := range appends {
83+
results[k] = newAppendResult(r)
84+
}
85+
pw := &pendingWrite{
86+
request: &storagepb.AppendRowsRequest{
87+
Rows: &storagepb.AppendRowsRequest_ProtoRows{
88+
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
89+
Rows: &storagepb.ProtoRows{
90+
SerializedRows: appends,
91+
},
92+
},
93+
},
94+
},
95+
results: results,
96+
}
97+
if offset > 0 {
98+
pw.request.Offset = &wrapperspb.Int64Value{Value: offset}
99+
}
100+
// We compute the size now for flow controller purposes, though
101+
// the actual request size may be slightly larger (e.g. the first
102+
// request in a new stream bears schema and stream id).
103+
pw.reqSize = proto.Size(pw.request)
104+
return pw
105+
}
106+
107+
// markDone propagates finalization of an append request to associated
108+
// AppendResult references.
109+
func (pw *pendingWrite) markDone(startOffset int64, err error) {
110+
curOffset := startOffset
111+
for _, ar := range pw.results {
112+
if err != nil {
113+
ar.err = err
114+
close(ar.ready)
115+
continue
116+
}
117+
118+
ar.offset = curOffset
119+
// only advance curOffset if we were given a valid starting offset.
120+
if startOffset >= 0 {
121+
curOffset = curOffset + 1
122+
}
123+
close(ar.ready)
124+
}
125+
// Clear the reference to the request.
126+
pw.request = nil
127+
}

Diff for: bigquery/storage/managedwriter/appendresult_test.go

+117
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Copyright 2021 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package managedwriter
16+
17+
import (
18+
"bytes"
19+
"fmt"
20+
"testing"
21+
"time"
22+
)
23+
24+
func TestAppendResult(t *testing.T) {
25+
26+
wantRowBytes := []byte("rowdata")
27+
28+
gotAR := newAppendResult(wantRowBytes)
29+
if !bytes.Equal(gotAR.rowData, wantRowBytes) {
30+
t.Errorf("mismatch in row data, got %q want %q", gotAR.rowData, wantRowBytes)
31+
}
32+
}
33+
34+
func TestPendingWrite(t *testing.T) {
35+
wantRowData := [][]byte{
36+
[]byte("row1"),
37+
[]byte("row2"),
38+
[]byte("row3"),
39+
}
40+
41+
var wantOffset int64 = 99
42+
43+
// first, verify no offset behavior
44+
pending := newPendingWrite(wantRowData, NoStreamOffset)
45+
if pending.request.GetOffset() != nil {
46+
t.Errorf("request should have no offset, but is present: %q", pending.request.GetOffset().GetValue())
47+
}
48+
pending.markDone(NoStreamOffset, nil)
49+
for k, ar := range pending.results {
50+
if ar.offset != NoStreamOffset {
51+
t.Errorf("mismatch on completed AppendResult(%d) without offset: got %d want %d", k, ar.offset, NoStreamOffset)
52+
}
53+
if ar.err != nil {
54+
t.Errorf("mismatch in error on AppendResult(%d), got %v want nil", k, ar.err)
55+
}
56+
}
57+
58+
// now, verify behavior with a valid offset
59+
pending = newPendingWrite(wantRowData, 99)
60+
if pending.request.GetOffset() == nil {
61+
t.Errorf("offset not set, should be %d", wantOffset)
62+
}
63+
if gotOffset := pending.request.GetOffset().GetValue(); gotOffset != wantOffset {
64+
t.Errorf("offset mismatch, got %d want %d", gotOffset, wantOffset)
65+
}
66+
67+
// check request shape
68+
gotRowCount := len(pending.request.GetProtoRows().GetRows().GetSerializedRows())
69+
if gotRowCount != len(wantRowData) {
70+
t.Errorf("pendingWrite request mismatch, got %d rows, want %d rows", gotRowCount, len(wantRowData))
71+
}
72+
73+
// verify child AppendResults
74+
if len(pending.results) != len(wantRowData) {
75+
t.Errorf("mismatch in rows and append results. %d rows, %d AppendResults", len(wantRowData), len(pending.results))
76+
}
77+
for k, ar := range pending.results {
78+
gotData := ar.rowData
79+
if !bytes.Equal(gotData, wantRowData[k]) {
80+
t.Errorf("row %d mismatch in data: got %q want %q", k, gotData, wantRowData[k])
81+
}
82+
select {
83+
case <-ar.Ready():
84+
t.Errorf("got Ready() on incomplete AppendResult %d", k)
85+
case <-time.After(100 * time.Millisecond):
86+
continue
87+
}
88+
}
89+
90+
// verify completion behavior
91+
reportedOffset := int64(101)
92+
wantErr := fmt.Errorf("foo")
93+
pending.markDone(reportedOffset, wantErr)
94+
95+
if pending.request != nil {
96+
t.Errorf("expected request to be cleared, is present: %#v", pending.request)
97+
}
98+
for k, ar := range pending.results {
99+
gotData := ar.rowData
100+
if !bytes.Equal(gotData, wantRowData[k]) {
101+
t.Errorf("row %d mismatch in data: got %q want %q", k, gotData, wantRowData[k])
102+
}
103+
select {
104+
case <-ar.Ready():
105+
continue
106+
case <-time.After(100 * time.Millisecond):
107+
t.Errorf("possible blocking on completed AppendResult %d", k)
108+
}
109+
if ar.offset != reportedOffset+int64(k) {
110+
t.Errorf("mismatch on completed AppendResult offset: got %d want %d", ar.offset, reportedOffset+int64(k))
111+
}
112+
if ar.err != wantErr {
113+
t.Errorf("mismatch in errors, got %v want %v", ar.err, wantErr)
114+
}
115+
}
116+
117+
}

0 commit comments

Comments
 (0)