From 4a4cd86bb8c4defa3fa51b9238242c4e4ad537bf Mon Sep 17 00:00:00 2001 From: Olav Loite Date: Thu, 16 Jan 2020 09:36:53 +0100 Subject: [PATCH] spanner: retry SessionNotFound on BeginTransaction for read-only tx 'Session not found' errors on BeginTransaction calls for a read-only transaction should be retried on a new session, and the invalid session should be removed from the session pool. Updates #1527. Change-Id: I49a6cb5e096c8b93c7aec76cdbd1c3d640f50c0d Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/50510 Reviewed-by: kokoro Reviewed-by: Hengfeng Li --- spanner/client_test.go | 51 +++++++++++++++++++++++++++++++++++++ spanner/transaction.go | 42 +++++++++++++++++------------- spanner/transaction_test.go | 23 ++++++++++------- 3 files changed, 90 insertions(+), 26 deletions(-) diff --git a/spanner/client_test.go b/spanner/client_test.go index 36cf0b9f6d48..f04b79059d52 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -581,6 +581,57 @@ func TestClient_ReadOnlyTransaction_UnavailableOnCreateSessionAndInvalidArgument } } +func TestClient_ReadOnlyTransaction_SessionNotFoundOnBeginTransaction(t *testing.T) { + t.Parallel() + if err := testReadOnlyTransaction( + t, + map[string]SimulatedExecutionTime{ + MethodBeginTransaction: {Errors: []error{status.Error(codes.NotFound, "Session not found")}}, + }, + ); err != nil { + t.Fatal(err) + } +} + +func TestClient_ReadOnlyTransaction_SessionNotFoundOnBeginTransaction_WithMaxOneSession(t *testing.T) { + t.Parallel() + server, client, teardown := setupMockedTestServerWithConfig( + t, + ClientConfig{ + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 0, + MaxOpened: 1, + }, + }) + defer teardown() + server.TestSpanner.PutExecutionTime( + MethodBeginTransaction, + SimulatedExecutionTime{Errors: []error{status.Error(codes.NotFound, "Session not found")}}, + ) + tx := client.ReadOnlyTransaction() + defer tx.Close() + ctx := context.Background() + if err := executeSingerQuery(ctx, tx); err != nil { + t.Fatal(err) + } +} + +func TestClient_ReadOnlyTransaction_SessionNotFoundOnExecuteStreamingSql(t *testing.T) { + t.Parallel() + // 'Session not found' is not retryable for a query in the middle of a + // read-only transaction, as it would require restarting a new transaction + // on a new session, and thereby breaking transaction atomicity. + err := testReadOnlyTransaction( + t, + map[string]SimulatedExecutionTime{ + MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.NotFound, "Session not found")}}, + }, + ) + if !isSessionNotFoundError(err) { + t.Fatalf("error mismatch\nWant: %v\nGot: %v", status.Errorf(codes.NotFound, "Session not found"), err) + } +} + func testReadOnlyTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime) error { server, client, teardown := setupMockedTestServer(t) defer teardown() diff --git a/spanner/transaction.go b/spanner/transaction.go index ffd55d6d057e..ec1002dd1c35 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -380,6 +380,7 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error { rts time.Time sh *sessionHandle err error + res *sppb.Transaction ) defer func() { if !locked { @@ -404,25 +405,32 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error { sh.recycle() } }() - sh, err = t.sp.take(ctx) - if err != nil { - return err - } - res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ - Session: sh.getID(), - Options: &sppb.TransactionOptions{ - Mode: &sppb.TransactionOptions_ReadOnly_{ - ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true), + // Retry the BeginTransaction call if a 'Session not found' is returned. + for { + sh, err = t.sp.take(ctx) + if err != nil { + return err + } + res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ + Session: sh.getID(), + Options: &sppb.TransactionOptions{ + Mode: &sppb.TransactionOptions_ReadOnly_{ + ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true), + }, }, - }, - }) - if err == nil { - tx = res.Id - if res.ReadTimestamp != nil { - rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) + }) + if isSessionNotFoundError(err) { + sh.destroy() + continue + } else if err == nil { + tx = res.Id + if res.ReadTimestamp != nil { + rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) + } + } else { + err = toSpannerError(err) } - } else { - err = toSpannerError(err) + break } t.mu.Lock() diff --git a/spanner/transaction_test.go b/spanner/transaction_test.go index fbd8bb47d217..3fb39552a908 100644 --- a/spanner/transaction_test.go +++ b/spanner/transaction_test.go @@ -195,8 +195,8 @@ func TestApply_RetryOnAbort(t *testing.T) { } } -// Tests that NotFound errors cause failures, and aren't retried. -func TestTransaction_NotFound(t *testing.T) { +// Tests that SessionNotFound errors are retried. +func TestTransaction_SessionNotFound(t *testing.T) { t.Parallel() ctx := context.Background() server, client, teardown := setupMockedTestServer(t) @@ -209,27 +209,32 @@ func TestTransaction_NotFound(t *testing.T) { }) server.TestSpanner.PutExecutionTime(MethodCommitTransaction, SimulatedExecutionTime{ - Errors: []error{serverErr, serverErr, serverErr}, + Errors: []error{serverErr}, }) - wantErr := toSpannerError(serverErr) txn := client.ReadOnlyTransaction() defer txn.Close() + var wantErr error if _, _, got := txn.acquire(ctx); !testEqual(wantErr, got) { - t.Fatalf("Expect acquire to fail, got %v, want %v.", got, wantErr) + t.Fatalf("Expect acquire to succeed, got %v, want %v.", got, wantErr) } - // The failure should recycle the session, we expect it to be used in - // following requests. + // The server error should lead to a retry of the BeginTransaction call and + // a valid session handle to be returned that will be used by the following + // requests. Note that calling txn.Query(...) does not actually send the + // query to the (mock) server. That is done at the first call to + // RowIterator.Next. The following statement only verifies that the + // transaction is in a valid state and received a valid session handle. if got := txn.Query(ctx, NewStatement("SELECT 1")); !testEqual(wantErr, got.err) { - t.Fatalf("Expect Query to fail, got %v, want %v.", got.err, wantErr) + t.Fatalf("Expect Query to succeed, got %v, want %v.", got.err, wantErr) } if got := txn.Read(ctx, "Users", KeySets(Key{"alice"}, Key{"bob"}), []string{"name", "email"}); !testEqual(wantErr, got.err) { - t.Fatalf("Expect Read to fail, got %v, want %v.", got.err, wantErr) + t.Fatalf("Expect Read to succeed, got %v, want %v.", got.err, wantErr) } + wantErr = toSpannerError(serverErr) ms := []*Mutation{ Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),