Skip to content

Commit

Permalink
spanner: retry aborted PDMLs
Browse files Browse the repository at this point in the history
Partitioned DML transactions can be aborted by Cloud Spanner.
These transactions were not automatically retried by the
client library. This feature was however added to the Java
library, and the Spanner team requested this change to be
added to the Go library as well.

Fixes #1536

Change-Id: Ibcb7a61cb5827880d3adff55f65b3d4ae8c6f573
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/44171
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Emmanuel Odeke <emm.odeke@gmail.com>
  • Loading branch information
olavloite committed Sep 9, 2019
1 parent 436cd0a commit 02b9199
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 66 deletions.
1 change: 1 addition & 0 deletions internal/kokoro/vet.sh
Expand Up @@ -62,6 +62,7 @@ golint ./... 2>&1 | ( \
grep -v "exported func Value returns unexported type pretty.val, which can be annoying to use" | \
grep -v "exported func Increment returns unexported type firestore.increment, which can be annoying to use" | \
grep -v "ExecuteStreamingSql" | \
grep -v "MethodExecuteSql should be MethodExecuteSQL" | \
grep -vE "pubsub\/pstest\/fake\.go.+should have comment or be unexported" | \
grep -v "ClusterId" | \
grep -v "InstanceId" | \
Expand Down
20 changes: 0 additions & 20 deletions spanner/client_test.go
Expand Up @@ -383,26 +383,6 @@ func TestClient_ApplyAtLeastOnce(t *testing.T) {
}
}

// PartitionedUpdate should not retry on aborted.
func TestClient_PartitionedUpdate(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
// PartitionedDML transactions are not committed.
server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
SimulatedExecutionTime{
Errors: []error{gstatus.Error(codes.Aborted, "Transaction aborted")},
})
_, err := client.PartitionedUpdate(context.Background(), NewStatement(UpdateBarSetFoo))
if err == nil {
t.Fatalf("Missing expected Aborted exception")
} else {
if gstatus.Code(err) != codes.Aborted {
t.Fatalf("Got unexpected error %v, expected Aborted", err)
}
}
}

func TestReadWriteTransaction_ErrUnexpectedEOF(t *testing.T) {
_, client, teardown := setupMockedTestServer(t)
defer teardown()
Expand Down
9 changes: 3 additions & 6 deletions spanner/internal/testutil/inmem_spanner_server.go
Expand Up @@ -57,6 +57,7 @@ const (
MethodCreateSession string = "CREATE_SESSION"
MethodDeleteSession string = "DELETE_SESSION"
MethodGetSession string = "GET_SESSION"
MethodExecuteSql string = "EXECUTE_SQL"
MethodExecuteStreamingSql string = "EXECUTE_STREAMING_SQL"
)

Expand Down Expand Up @@ -563,13 +564,9 @@ func (s *inMemSpannerServer) DeleteSession(ctx context.Context, req *spannerpb.D
}

func (s *inMemSpannerServer) ExecuteSql(ctx context.Context, req *spannerpb.ExecuteSqlRequest) (*spannerpb.ResultSet, error) {
s.mu.Lock()
if s.stopped {
s.mu.Unlock()
return nil, gstatus.Error(codes.Unavailable, "server has been stopped")
if err := s.simulateExecutionTime(MethodExecuteSql, req); err != nil {
return nil, err
}
s.receivedRequests <- req
s.mu.Unlock()
if req.Session == "" {
return nil, gstatus.Error(codes.InvalidArgument, "Missing session name")
}
Expand Down
96 changes: 56 additions & 40 deletions spanner/pdml.go
Expand Up @@ -16,10 +16,9 @@ package spanner

import (
"context"
"time"

"cloud.google.com/go/internal/trace"
"google.golang.org/api/iterator"
"github.com/googleapis/gax-go/v2"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc/codes"
)
Expand All @@ -38,66 +37,83 @@ func (c *Client) PartitionedUpdate(ctx context.Context, statement Statement) (co
if err := checkNestedTxn(ctx); err != nil {
return 0, err
}

var (
tx transactionID
s *session
sh *sessionHandle
)
// Create session.
// Create a session that will be used only for this request.
sc := c.rrNext()
s, err = createSession(ctx, sc, c.database, c.sessionLabels, c.md)
if err != nil {
return 0, toSpannerError(err)
}
// Delete the session at the end of the request. If the PDML statement
// timed out or was cancelled, the DeleteSession request might not succeed,
// but the session will eventually be garbage collected by the server.
defer s.delete(ctx)
sh = &sessionHandle{session: s}
// Begin transaction.
res, err := sc.BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
Session: sh.getID(),
Options: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}},
},
})
if err != nil {
return 0, toSpannerError(err)
}
// Create the parameters and the SQL request, but without a transaction.
// The transaction reference will be added by the executePdml method.
params, paramTypes, err := statement.convertParams()
if err != nil {
return 0, toSpannerError(err)
}
tx = res.Id

req := &sppb.ExecuteSqlRequest{
Session: sh.getID(),
Transaction: &sppb.TransactionSelector{
Selector: &sppb.TransactionSelector_Id{Id: tx},
},
Session: sh.getID(),
Sql: statement.SQL,
Params: params,
ParamTypes: paramTypes,
}
rpc := func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
req.ResumeToken = resumeToken
return sc.ExecuteStreamingSql(ctx, req)
}
iter := stream(contextWithOutgoingMetadata(ctx, sh.getMetadata()),
rpc, func(time.Time) {}, func(error) {})
// TODO(jba): factor out the following code from here and ReadWriteTransaction.Update.
defer iter.Stop()
for {
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return 0, toSpannerError(err)

// Make a retryer for Aborted errors.
// TODO: use generic Aborted retryer when merged with master
retryer := gax.OnCodes([]codes.Code{codes.Aborted}, DefaultRetryBackoff)
// Execute the PDML and retry if the transaction is aborted.
executePdmlWithRetry := func(ctx context.Context) (int64, error) {
for {
count, err := executePdml(ctx, sh, req)
if err == nil {
return count, nil
}
delay, shouldRetry := retryer.Retry(err)
if !shouldRetry {
return 0, err
}
if err := gax.Sleep(ctx, delay); err != nil {
return 0, err
}
}
time.Sleep(time.Second)
}
return executePdmlWithRetry(ctx)
}

if !iter.sawStats {
return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", statement.SQL)
// executePdml executes the following steps:
// 1. Begin a PDML transaction
// 2. Add the ID of the PDML transaction to the SQL request.
// 3. Execute the update statement on the PDML transaction
//
// Note that PDML transactions cannot be committed or rolled back.
func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest) (count int64, err error) {
// Begin transaction.
res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
Session: sh.getID(),
Options: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}},
},
})
if err != nil {
return 0, toSpannerError(err)
}
// Add a reference to the PDML transaction on the ExecuteSql request.
req.Transaction = &sppb.TransactionSelector{
Selector: &sppb.TransactionSelector_Id{Id: res.Id},
}
resultSet, err := sh.getClient().ExecuteSql(ctx, req)
if err != nil {
return 0, err
}
if resultSet.Stats == nil {
return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", req.Sql)
}
return iter.RowCount, nil
return extractRowCount(resultSet.Stats)
}
72 changes: 72 additions & 0 deletions spanner/pdml_test.go
Expand Up @@ -15,11 +15,15 @@
package spanner

import (
"bytes"
"context"
"testing"
"time"

. "cloud.google.com/go/spanner/internal/testutil"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func TestMockPartitionedUpdate(t *testing.T) {
Expand Down Expand Up @@ -52,3 +56,71 @@ func TestMockPartitionedUpdateWithQuery(t *testing.T) {
t.Errorf("got error %v, want code %s", err, wantCode)
}
}

// PDML should be retried if the transaction is aborted.
func TestPartitionedUpdate_Aborted(t *testing.T) {
t.Parallel()
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()

server.TestSpanner.PutExecutionTime(MethodExecuteSql,
SimulatedExecutionTime{
Errors: []error{status.Error(codes.Aborted, "Transaction aborted")},
})
stmt := NewStatement(UpdateBarSetFoo)
rowCount, err := client.PartitionedUpdate(ctx, stmt)
if err != nil {
t.Fatal(err)
}
want := int64(UpdateBarSetFooRowCount)
if rowCount != want {
t.Errorf("Row count mismatch\ngot: %d\nwant: %d", rowCount, want)
}

gotReqs, err := shouldHaveReceived(server.TestSpanner, []interface{}{
&sppb.CreateSessionRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.BeginTransactionRequest{},
&sppb.ExecuteSqlRequest{},
&sppb.DeleteSessionRequest{},
})
if err != nil {
t.Fatal(err)
}
id1 := gotReqs[2].(*sppb.ExecuteSqlRequest).Transaction.GetId()
id2 := gotReqs[4].(*sppb.ExecuteSqlRequest).Transaction.GetId()
if bytes.Equal(id1, id2) {
t.Errorf("same transaction used twice, expected two different transactions\ngot tx1: %q\ngot tx2: %q", id1, id2)
}
}

// Test that a deadline is respected by PDML, and that the session that was
// created is also deleted, even though the update timed out.
func TestPartitionedUpdate_WithDeadline(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServer(t)
defer teardown()

ctx := context.Background()
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(50*time.Millisecond))
defer cancel()
server.TestSpanner.PutExecutionTime(MethodExecuteSql,
SimulatedExecutionTime{
MinimumExecutionTime: 100 * time.Millisecond,
})
stmt := NewStatement(UpdateBarSetFoo)
// The following update will cause a 'Failed to delete session' warning to
// be logged. This is expected. Once each client has its own logger, we
// should temporarily turn off logging to prevent this warning to be
// logged.
_, err := client.PartitionedUpdate(ctx, stmt)
if err == nil {
t.Fatalf("missing expected error")
}
wantCode := codes.DeadlineExceeded
if status.Code(err) != wantCode {
t.Fatalf("got error %v, want code %s", err, wantCode)
}
}
9 changes: 9 additions & 0 deletions spanner/retry.go
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/googleapis/gax-go/v2"
edpb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
Expand All @@ -32,6 +33,14 @@ const (
retryInfoKey = "google.rpc.retryinfo-bin"
)

// DefaultRetryBackoff is used for retryers as a fallback value when the server
// did not return any retry information.
var DefaultRetryBackoff = gax.Backoff{
Initial: 20 * time.Millisecond,
Max: 32 * time.Second,
Multiplier: 1.3,
}

// isErrorClosing reports whether the error is generated by gRPC layer talking
// to a closed server.
func isErrorClosing(err error) bool {
Expand Down

0 comments on commit 02b9199

Please sign in to comment.