Skip to content

Commit

Permalink
spanner: retry SessionNotFound on BeginTransaction for read-only tx
Browse files Browse the repository at this point in the history
'Session not found' errors on BeginTransaction calls for a read-only transaction
should be retried on a new session, and the invalid session should be removed
from the session pool.

Updates #1527.

Change-Id: I49a6cb5e096c8b93c7aec76cdbd1c3d640f50c0d
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/50510
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Hengfeng Li <hengfeng@google.com>
  • Loading branch information
olavloite committed Jan 20, 2020
1 parent 4f99193 commit 4a4cd86
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 26 deletions.
51 changes: 51 additions & 0 deletions spanner/client_test.go
Expand Up @@ -581,6 +581,57 @@ func TestClient_ReadOnlyTransaction_UnavailableOnCreateSessionAndInvalidArgument
}
}

func TestClient_ReadOnlyTransaction_SessionNotFoundOnBeginTransaction(t *testing.T) {
t.Parallel()
if err := testReadOnlyTransaction(
t,
map[string]SimulatedExecutionTime{
MethodBeginTransaction: {Errors: []error{status.Error(codes.NotFound, "Session not found")}},
},
); err != nil {
t.Fatal(err)
}
}

func TestClient_ReadOnlyTransaction_SessionNotFoundOnBeginTransaction_WithMaxOneSession(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServerWithConfig(
t,
ClientConfig{
SessionPoolConfig: SessionPoolConfig{
MinOpened: 0,
MaxOpened: 1,
},
})
defer teardown()
server.TestSpanner.PutExecutionTime(
MethodBeginTransaction,
SimulatedExecutionTime{Errors: []error{status.Error(codes.NotFound, "Session not found")}},
)
tx := client.ReadOnlyTransaction()
defer tx.Close()
ctx := context.Background()
if err := executeSingerQuery(ctx, tx); err != nil {
t.Fatal(err)
}
}

func TestClient_ReadOnlyTransaction_SessionNotFoundOnExecuteStreamingSql(t *testing.T) {
t.Parallel()
// 'Session not found' is not retryable for a query in the middle of a
// read-only transaction, as it would require restarting a new transaction
// on a new session, and thereby breaking transaction atomicity.
err := testReadOnlyTransaction(
t,
map[string]SimulatedExecutionTime{
MethodExecuteStreamingSql: {Errors: []error{status.Error(codes.NotFound, "Session not found")}},
},
)
if !isSessionNotFoundError(err) {
t.Fatalf("error mismatch\nWant: %v\nGot: %v", status.Errorf(codes.NotFound, "Session not found"), err)
}
}

func testReadOnlyTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime) error {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
Expand Down
42 changes: 25 additions & 17 deletions spanner/transaction.go
Expand Up @@ -380,6 +380,7 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error {
rts time.Time
sh *sessionHandle
err error
res *sppb.Transaction
)
defer func() {
if !locked {
Expand All @@ -404,25 +405,32 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error {
sh.recycle()
}
}()
sh, err = t.sp.take(ctx)
if err != nil {
return err
}
res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
Session: sh.getID(),
Options: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_ReadOnly_{
ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true),
// Retry the BeginTransaction call if a 'Session not found' is returned.
for {
sh, err = t.sp.take(ctx)
if err != nil {
return err
}
res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
Session: sh.getID(),
Options: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_ReadOnly_{
ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true),
},
},
},
})
if err == nil {
tx = res.Id
if res.ReadTimestamp != nil {
rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
})
if isSessionNotFoundError(err) {
sh.destroy()
continue
} else if err == nil {
tx = res.Id
if res.ReadTimestamp != nil {
rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
}
} else {
err = toSpannerError(err)
}
} else {
err = toSpannerError(err)
break
}
t.mu.Lock()

Expand Down
23 changes: 14 additions & 9 deletions spanner/transaction_test.go
Expand Up @@ -195,8 +195,8 @@ func TestApply_RetryOnAbort(t *testing.T) {
}
}

// Tests that NotFound errors cause failures, and aren't retried.
func TestTransaction_NotFound(t *testing.T) {
// Tests that SessionNotFound errors are retried.
func TestTransaction_SessionNotFound(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
Expand All @@ -209,27 +209,32 @@ func TestTransaction_NotFound(t *testing.T) {
})
server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
SimulatedExecutionTime{
Errors: []error{serverErr, serverErr, serverErr},
Errors: []error{serverErr},
})
wantErr := toSpannerError(serverErr)

txn := client.ReadOnlyTransaction()
defer txn.Close()

var wantErr error
if _, _, got := txn.acquire(ctx); !testEqual(wantErr, got) {
t.Fatalf("Expect acquire to fail, got %v, want %v.", got, wantErr)
t.Fatalf("Expect acquire to succeed, got %v, want %v.", got, wantErr)
}

// The failure should recycle the session, we expect it to be used in
// following requests.
// The server error should lead to a retry of the BeginTransaction call and
// a valid session handle to be returned that will be used by the following
// requests. Note that calling txn.Query(...) does not actually send the
// query to the (mock) server. That is done at the first call to
// RowIterator.Next. The following statement only verifies that the
// transaction is in a valid state and received a valid session handle.
if got := txn.Query(ctx, NewStatement("SELECT 1")); !testEqual(wantErr, got.err) {
t.Fatalf("Expect Query to fail, got %v, want %v.", got.err, wantErr)
t.Fatalf("Expect Query to succeed, got %v, want %v.", got.err, wantErr)
}

if got := txn.Read(ctx, "Users", KeySets(Key{"alice"}, Key{"bob"}), []string{"name", "email"}); !testEqual(wantErr, got.err) {
t.Fatalf("Expect Read to fail, got %v, want %v.", got.err, wantErr)
t.Fatalf("Expect Read to succeed, got %v, want %v.", got.err, wantErr)
}

wantErr = toSpannerError(serverErr)
ms := []*Mutation{
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
Expand Down

0 comments on commit 4a4cd86

Please sign in to comment.