From 834c27de46b059f9efd3bf227991c2d80addfb9e Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 9 Jul 2021 21:25:51 +0000 Subject: [PATCH 1/6] feat(bigquery/storage/managedwriter): add state tracking types This PR introduces two new types: AppendResult - tracks the progress of an individual row append to completion, either success or error. Successful appends _may_ have an associated offset, failed appends will have an associated error. The AppendResult has a blocking method users can interrogate. pendingWrite - handles the state management for a set of rows appended as a group. There's a 1:many relationship between pendingWrite:AppenResult(s), so as a pendingWrite completes all associated AppendResult references should be updated. --- .../storage/managedwriter/appendresult.go | 132 ++++++++++++++++++ .../managedwriter/appendresult_test.go | 104 ++++++++++++++ 2 files changed, 236 insertions(+) create mode 100644 bigquery/storage/managedwriter/appendresult.go create mode 100644 bigquery/storage/managedwriter/appendresult_test.go diff --git a/bigquery/storage/managedwriter/appendresult.go b/bigquery/storage/managedwriter/appendresult.go new file mode 100644 index 000000000000..92e9b1d2ca22 --- /dev/null +++ b/bigquery/storage/managedwriter/appendresult.go @@ -0,0 +1,132 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package managedwriter + +import ( + "context" + + storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/wrapperspb" +) + +// AppendResult tracks the status of a single row of data. +type AppendResult struct { + // rowData contains the serialized row data. + rowData []byte + + ready chan struct{} + + // if the encapsulating append failed, this will retain a reference to the error. + err error + + // the stream offset + offset int64 +} + +func newAppendResult(data []byte) *AppendResult { + return &AppendResult{ + ready: make(chan struct{}), + rowData: data, + } +} + +// Ready blocks until the append request is completed. +func (ar *AppendResult) Ready() <-chan struct{} { return ar.ready } + +// GetResult returns the optional offset of this row, or the associated +// error. +func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) { + select { + case <-ar.Ready(): + return ar.offset, ar.err + default: + } + + select { + case <-ctx.Done(): + return 0, ctx.Err() + case <-ar.Ready(): + return ar.offset, ar.err + } +} + +// pendingWrite tracks state for a set of rows that are part of a single +// append request. +type pendingWrite struct { + request *storagepb.AppendRowsRequest + results []*AppendResult + + // this is used by the flow controller. + reqSize int +} + +// newPendingWrite constructs the proto request and attaches references +// to the pending results for later consumption. The reason for this is +// that in the future, we may want to allow row batching to be managed by +// the server (e.g. for default/COMMITTED streams). For BUFFERED/PENDING +// streams, this should be managed by the user. +func newPendingWrite(appends [][]byte, offset int64) *pendingWrite { + + results := make([]*AppendResult, len(appends)) + for k, r := range appends { + results[k] = newAppendResult(r) + } + pw := &pendingWrite{ + request: &storagepb.AppendRowsRequest{ + Rows: &storagepb.AppendRowsRequest_ProtoRows{ + ProtoRows: &storagepb.AppendRowsRequest_ProtoData{ + Rows: &storagepb.ProtoRows{ + SerializedRows: appends, + }, + }, + }, + }, + results: results, + } + if offset > 0 { + pw.request.Offset = &wrapperspb.Int64Value{Value: offset} + } + // We compute the size now for flow controller purposes, though + // the actual request size may be slightly larger (e.g. the first + // request in a new stream bears schema and stream id). + pw.reqSize = proto.Size(pw.request) + return pw +} + +// noOffsetSpecified is a sentinel value for signalling we're not tracking +// stream offset (e.g. a default stream which allows simultaneous append streams). +var noOffsetSpecified int64 = -1 + +// markDone propagates finalization of an append request to associated +// AppendResult references. +func (pw *pendingWrite) markDone(startOffset int64, err error) { + curOffset := startOffset + for _, ar := range pw.results { + if err != nil { + ar.err = err + close(ar.ready) + continue + } + + ar.offset = curOffset + // only advance curOffset if we were given a valid starting offset. + if startOffset >= 0 { + curOffset = curOffset + 1 + } + close(ar.ready) + } + // Clear the reference to the request. + pw.request = nil +} diff --git a/bigquery/storage/managedwriter/appendresult_test.go b/bigquery/storage/managedwriter/appendresult_test.go new file mode 100644 index 000000000000..7cfc726f6f93 --- /dev/null +++ b/bigquery/storage/managedwriter/appendresult_test.go @@ -0,0 +1,104 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package managedwriter + +import ( + "bytes" + "fmt" + "testing" + "time" +) + +func TestAppendResult(t *testing.T) { + + wantRowBytes := []byte("rowdata") + + gotAR := newAppendResult(wantRowBytes) + if !bytes.Equal(gotAR.rowData, wantRowBytes) { + t.Errorf("mismatch in row data, got %q want %q", gotAR.rowData, wantRowBytes) + } +} + +func TestPendingWrite(t *testing.T) { + wantRowData := [][]byte{ + []byte("row1"), + []byte("row2"), + []byte("row3"), + } + + var wantOffset int64 = 99 + + // first, verify no offset behavior + pending := newPendingWrite(wantRowData, noOffsetSpecified) + if pending.request.GetOffset() != nil { + t.Errorf("request should have no offset, but is present: %q", pending.request.GetOffset().GetValue()) + } + + // now, verify behavior with a valid offset + pending = newPendingWrite(wantRowData, 99) + if pending.request.GetOffset() == nil { + t.Errorf("offset not set, should be %d", wantOffset) + } + if gotOffset := pending.request.GetOffset().GetValue(); gotOffset != wantOffset { + t.Errorf("offset mismatch, got %d want %d", gotOffset, wantOffset) + } + + // check request shape + gotRowCount := len(pending.request.GetProtoRows().GetRows().GetSerializedRows()) + if gotRowCount != len(wantRowData) { + t.Errorf("pendingWrite request mismatch, got %d rows, want %d rows", gotRowCount, len(wantRowData)) + } + + // verify child AppendResults + if len(pending.results) != len(wantRowData) { + t.Errorf("mismatch in rows and append results. %d rows, %d AppendResults", len(wantRowData), len(pending.results)) + } + for k, ar := range pending.results { + gotData := ar.rowData + if !bytes.Equal(gotData, wantRowData[k]) { + t.Errorf("row %d mismatch in data: got %q want %q", k, gotData, wantRowData[k]) + } + select { + case <-ar.Ready(): + t.Errorf("got Ready() on incomplete AppendResult %d", k) + case <-time.After(100 * time.Millisecond): + continue + } + } + + // verify completion behavior + reportedOffset := int64(101) + wantErr := fmt.Errorf("foo") + pending.markDone(reportedOffset, wantErr) + + if pending.request != nil { + t.Errorf("expected request to be cleared, is present: %#v", pending.request) + } + for k, ar := range pending.results { + gotData := ar.rowData + if !bytes.Equal(gotData, wantRowData[k]) { + t.Errorf("row %d mismatch in data: got %q want %q", k, gotData, wantRowData[k]) + } + select { + case <-ar.Ready(): + continue + case <-time.After(100 * time.Millisecond): + t.Errorf("possible blocking on completed AppendResult %d", k) + } + if ar.offset != reportedOffset+int64(k) { + t.Errorf("mismatch on completed AppendResult offset: got %d want %d", ar.offset, reportedOffset+int64(k)) + } + } + +} From fffb8eafc3768c38df30b05d953b2525c8f99461 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 9 Jul 2021 21:34:05 +0000 Subject: [PATCH 2/6] more testing --- bigquery/storage/managedwriter/appendresult_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/bigquery/storage/managedwriter/appendresult_test.go b/bigquery/storage/managedwriter/appendresult_test.go index 7cfc726f6f93..55eedb329423 100644 --- a/bigquery/storage/managedwriter/appendresult_test.go +++ b/bigquery/storage/managedwriter/appendresult_test.go @@ -99,6 +99,9 @@ func TestPendingWrite(t *testing.T) { if ar.offset != reportedOffset+int64(k) { t.Errorf("mismatch on completed AppendResult offset: got %d want %d", ar.offset, reportedOffset+int64(k)) } + if ar.err != wantErr { + t.Errorf("mismatch in errors, got %v want %v", ar.err, wantErr) + } } } From 432589d56a8a7444bcc622ba7c80b646d185e044 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 9 Jul 2021 21:56:11 +0000 Subject: [PATCH 3/6] detach header comment from package name --- bigquery/storage/managedwriter/appendresult.go | 1 + 1 file changed, 1 insertion(+) diff --git a/bigquery/storage/managedwriter/appendresult.go b/bigquery/storage/managedwriter/appendresult.go index 92e9b1d2ca22..41c538fae6d0 100644 --- a/bigquery/storage/managedwriter/appendresult.go +++ b/bigquery/storage/managedwriter/appendresult.go @@ -11,6 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package managedwriter import ( From e0e6fbf7cf8de695e06e6b521a6b2cc378836c92 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Fri, 9 Jul 2021 21:57:18 +0000 Subject: [PATCH 4/6] and in test --- bigquery/storage/managedwriter/appendresult_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/bigquery/storage/managedwriter/appendresult_test.go b/bigquery/storage/managedwriter/appendresult_test.go index 55eedb329423..eab7fb600cb0 100644 --- a/bigquery/storage/managedwriter/appendresult_test.go +++ b/bigquery/storage/managedwriter/appendresult_test.go @@ -11,6 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + package managedwriter import ( From 686ba6ad4063aef8dbf4adaf068b170fed1795bb Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Mon, 12 Jul 2021 18:22:53 +0000 Subject: [PATCH 5/6] Address reviewer feedback --- bigquery/storage/managedwriter/appendresult.go | 14 ++++---------- .../storage/managedwriter/appendresult_test.go | 11 ++++++++++- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/bigquery/storage/managedwriter/appendresult.go b/bigquery/storage/managedwriter/appendresult.go index 41c538fae6d0..a063e72094cc 100644 --- a/bigquery/storage/managedwriter/appendresult.go +++ b/bigquery/storage/managedwriter/appendresult.go @@ -22,6 +22,10 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" ) +// NoOffset is a sentinel value for signalling we're not tracking +// stream offset (e.g. a default stream which allows simultaneous append streams). +const NoOffset int64 = -1 + // AppendResult tracks the status of a single row of data. type AppendResult struct { // rowData contains the serialized row data. @@ -49,12 +53,6 @@ func (ar *AppendResult) Ready() <-chan struct{} { return ar.ready } // GetResult returns the optional offset of this row, or the associated // error. func (ar *AppendResult) GetResult(ctx context.Context) (int64, error) { - select { - case <-ar.Ready(): - return ar.offset, ar.err - default: - } - select { case <-ctx.Done(): return 0, ctx.Err() @@ -106,10 +104,6 @@ func newPendingWrite(appends [][]byte, offset int64) *pendingWrite { return pw } -// noOffsetSpecified is a sentinel value for signalling we're not tracking -// stream offset (e.g. a default stream which allows simultaneous append streams). -var noOffsetSpecified int64 = -1 - // markDone propagates finalization of an append request to associated // AppendResult references. func (pw *pendingWrite) markDone(startOffset int64, err error) { diff --git a/bigquery/storage/managedwriter/appendresult_test.go b/bigquery/storage/managedwriter/appendresult_test.go index eab7fb600cb0..0d4fe9fa28f5 100644 --- a/bigquery/storage/managedwriter/appendresult_test.go +++ b/bigquery/storage/managedwriter/appendresult_test.go @@ -41,10 +41,19 @@ func TestPendingWrite(t *testing.T) { var wantOffset int64 = 99 // first, verify no offset behavior - pending := newPendingWrite(wantRowData, noOffsetSpecified) + pending := newPendingWrite(wantRowData, NoOffset) if pending.request.GetOffset() != nil { t.Errorf("request should have no offset, but is present: %q", pending.request.GetOffset().GetValue()) } + pending.markDone(NoOffset, nil) + for k, ar := range pending.results { + if ar.offset != NoOffset { + t.Errorf("mismatch on completed AppendResult(%d) without offset: got %d want %d", k, ar.offset, NoOffset) + } + if ar.err != nil { + t.Errorf("mismatch in error on AppendResult(%d), got %v want nil", k, ar.err) + } + } // now, verify behavior with a valid offset pending = newPendingWrite(wantRowData, 99) From f44e792c185438431cf39018c112bf9a7e404660 Mon Sep 17 00:00:00 2001 From: Seth Hollyman Date: Mon, 12 Jul 2021 19:01:55 +0000 Subject: [PATCH 6/6] rename constant --- bigquery/storage/managedwriter/appendresult.go | 4 ++-- bigquery/storage/managedwriter/appendresult_test.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/bigquery/storage/managedwriter/appendresult.go b/bigquery/storage/managedwriter/appendresult.go index a063e72094cc..2570d80d7a00 100644 --- a/bigquery/storage/managedwriter/appendresult.go +++ b/bigquery/storage/managedwriter/appendresult.go @@ -22,9 +22,9 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" ) -// NoOffset is a sentinel value for signalling we're not tracking +// NoStreamOffset is a sentinel value for signalling we're not tracking // stream offset (e.g. a default stream which allows simultaneous append streams). -const NoOffset int64 = -1 +const NoStreamOffset int64 = -1 // AppendResult tracks the status of a single row of data. type AppendResult struct { diff --git a/bigquery/storage/managedwriter/appendresult_test.go b/bigquery/storage/managedwriter/appendresult_test.go index 0d4fe9fa28f5..c8beeafa7346 100644 --- a/bigquery/storage/managedwriter/appendresult_test.go +++ b/bigquery/storage/managedwriter/appendresult_test.go @@ -41,14 +41,14 @@ func TestPendingWrite(t *testing.T) { var wantOffset int64 = 99 // first, verify no offset behavior - pending := newPendingWrite(wantRowData, NoOffset) + pending := newPendingWrite(wantRowData, NoStreamOffset) if pending.request.GetOffset() != nil { t.Errorf("request should have no offset, but is present: %q", pending.request.GetOffset().GetValue()) } - pending.markDone(NoOffset, nil) + pending.markDone(NoStreamOffset, nil) for k, ar := range pending.results { - if ar.offset != NoOffset { - t.Errorf("mismatch on completed AppendResult(%d) without offset: got %d want %d", k, ar.offset, NoOffset) + if ar.offset != NoStreamOffset { + t.Errorf("mismatch on completed AppendResult(%d) without offset: got %d want %d", k, ar.offset, NoStreamOffset) } if ar.err != nil { t.Errorf("mismatch in error on AppendResult(%d), got %v want nil", k, ar.err)