From 48b08bf47494052a2652a3ab519de76fddc74c8a Mon Sep 17 00:00:00 2001 From: shollyman Date: Tue, 29 Aug 2023 11:34:13 -0700 Subject: [PATCH] fix(bigquery/storage/managedwriter): address possible deadlocks (#8507) The context refactoring in #8275 introduced two possible sources of deadlocks in the ManagedStream AppendRows call path. This PR addresses those, and augments deadlock testing to cover this case. Fixes: https://togithub.com/googleapis/google-cloud-go/issues/8505 --- bigquery/storage/managedwriter/managed_stream.go | 14 ++++++++------ .../storage/managedwriter/managed_stream_test.go | 11 +++++++++++ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 50bae032ab7..faaf9a776a4 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -197,10 +197,11 @@ func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) ( func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOption) error { for { ms.mu.Lock() - if ms.err != nil { - return ms.err - } + err := ms.err ms.mu.Unlock() + if err != nil { + return err + } conn, err := ms.pool.selectConn(pw) if err != nil { pw.markDone(nil, err) @@ -291,10 +292,11 @@ func (ms *ManagedStream) buildRequest(data [][]byte) *storagepb.AppendRowsReques func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...AppendOption) (*AppendResult, error) { // before we do anything, ensure the writer isn't closed. ms.mu.Lock() - if ms.err != nil { - return nil, ms.err - } + err := ms.err ms.mu.Unlock() + if err != nil { + return nil, err + } // Ensure we build the request and pending write with a consistent schema version. curSchemaVersion := ms.curDescVersion req := ms.buildRequest(data) diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index d05446b1bac..9a69b2c24e1 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -421,6 +421,17 @@ func TestManagedStream_AppendDeadlocks(t *testing.T) { // Issue two closes, to ensure we're not deadlocking there either. ms.Close() ms.Close() + + // Issue two more appends, ensure we're not deadlocked as the writer is closed. + gotErr = ms.appendWithRetry(pw) + if !errors.Is(gotErr, io.EOF) { + t.Errorf("expected io.EOF, got %v", gotErr) + } + gotErr = ms.appendWithRetry(pw) + if !errors.Is(gotErr, io.EOF) { + t.Errorf("expected io.EOF, got %v", gotErr) + } + } }